Merge branch 'master' of gitee.com:openharmony/distributeddatamgr_datamgr_service into master

Signed-off-by: wellinleo <yangliu178@huawei.com>
This commit is contained in:
wellinleo 2024-11-18 13:46:31 +00:00 committed by Gitee
commit 2f46aced16
48 changed files with 683 additions and 144 deletions

View File

@ -43,8 +43,8 @@ AppPipeHandler::AppPipeHandler(const PipeInfo &pipeInfo)
softbusAdapter_ = SoftBusAdapter::GetInstance();
}
Status AppPipeHandler::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
uint32_t totalLength, const MessageInfo &info)
std::pair<Status, int32_t> AppPipeHandler::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
const DataInfo &dataInfo, uint32_t totalLength, const MessageInfo &info)
{
return softbusAdapter_->SendData(pipeInfo, deviceId, dataInfo, totalLength, info);
}

View File

@ -42,7 +42,7 @@ public:
// stop DataChangeListener to watch data change;
Status StopWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo);
// Send data to other device, function will be called back after sent to notify send result.
Status SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
std::pair<Status, int32_t> SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
uint32_t totalLength, const MessageInfo &info);
bool IsSameStartedOnPeer(const struct PipeInfo &pipeInfo, const struct DeviceId &peer);

View File

@ -58,13 +58,13 @@ Status AppPipeMgr::StopWatchDataChange(const AppDataChangeListener *observer, co
}
// Send data to other device, function will be called back after sent to notify send result.
Status AppPipeMgr::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
uint32_t totalLength, const MessageInfo &info)
std::pair<Status, int32_t> AppPipeMgr::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
const DataInfo &dataInfo, uint32_t totalLength, const MessageInfo &info)
{
if (dataInfo.length > DataBuffer::MAX_TRANSFER_SIZE || dataInfo.length == 0 || dataInfo.data == nullptr ||
pipeInfo.pipeId.empty() || deviceId.deviceId.empty()) {
ZLOGW("Input is invalid, maxSize:%u, current size:%u", DataBuffer::MAX_TRANSFER_SIZE, dataInfo.length);
return Status::ERROR;
return std::make_pair(Status::ERROR, 0);
}
ZLOGD("pipeInfo:%s ,size:%u, total length:%u", pipeInfo.pipeId.c_str(), dataInfo.length, totalLength);
std::shared_ptr<AppPipeHandler> appPipeHandler;
@ -73,7 +73,7 @@ Status AppPipeMgr::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
auto it = dataBusMap_.find(pipeInfo.pipeId);
if (it == dataBusMap_.end()) {
ZLOGW("pipeInfo:%s not found", pipeInfo.pipeId.c_str());
return Status::KEY_NOT_FOUND;
return std::make_pair(Status::KEY_NOT_FOUND, 0);
}
appPipeHandler = it->second;
}

View File

@ -36,7 +36,7 @@ public:
Status StopWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo);
// Send data to other device, function will be called back after sent to notify send result.
Status SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
std::pair<Status, int32_t> SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
uint32_t totalLength, const MessageInfo &info);
// start server
Status Start(const PipeInfo &pipeInfo);

View File

@ -48,8 +48,8 @@ Status CommunicationProviderImpl::StopWatchDataChange(const AppDataChangeListene
return appPipeMgr_.StopWatchDataChange(observer, pipeInfo);
}
Status CommunicationProviderImpl::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
uint32_t totalLength, const MessageInfo &info)
std::pair<Status, int32_t> CommunicationProviderImpl::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
const DataInfo &dataInfo, uint32_t totalLength, const MessageInfo &info)
{
return appPipeMgr_.SendData(pipeInfo, deviceId, dataInfo, totalLength, info);
}

View File

@ -36,7 +36,7 @@ public:
Status StopWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo) override;
// Send data to other device, function will be called back after sent to notify send result.
Status SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
std::pair<Status, int32_t> SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
uint32_t totalLength, const MessageInfo &info) override;
// start 1 server to listen data from other devices;

View File

@ -17,6 +17,7 @@
#include "communicator_context.h"
#include "log_print.h"
#include "kvstore_utils.h"
#include "softbus_error_code.h"
namespace OHOS::DistributedData {
using KvUtils = OHOS::DistributedKv::KvStoreUtils;
@ -72,28 +73,32 @@ Status CommunicatorContext::UnRegSessionListener(const DevChangeListener *observ
return Status::SUCCESS;
}
void CommunicatorContext::NotifySessionReady(const std::string &deviceId)
void CommunicatorContext::NotifySessionReady(const std::string &deviceId, int32_t errCode)
{
if (deviceId.empty()) {
ZLOGE("deviceId empty");
return;
}
devices_.Insert(deviceId, deviceId);
if (errCode == SOFTBUS_OK) {
devices_.Insert(deviceId, deviceId);
}
DeviceInfo devInfo;
devInfo.uuid = deviceId;
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
for (const auto &observer : observers_) {
if (observer != nullptr) {
observer->OnSessionReady(devInfo);
observer->OnSessionReady(devInfo, errCode);
}
}
ZLOGI("Notify session begin, deviceId:%{public}s, observer count:%{public}zu",
KvUtils::ToBeAnonymous(deviceId).c_str(), observers_.size());
}
std::lock_guard<decltype(sessionMutex_)> sessionLockGard(sessionMutex_);
if (closeListener_) {
closeListener_(deviceId);
if (errCode == SOFTBUS_OK) {
std::lock_guard<decltype(sessionMutex_)> sessionLockGard(sessionMutex_);
if (closeListener_) {
closeListener_(deviceId);
}
}
}

View File

@ -139,14 +139,18 @@ DBStatus ProcessCommunicatorImpl::SendData(const DeviceInfos &dstDevInfo, const
const DataInfo dataInfo = { const_cast<uint8_t *>(data), length};
DeviceId destination;
destination.deviceId = dstDevInfo.identifier;
Status errCode = CommunicationProvider::GetInstance().SendData(pi, destination, dataInfo, totalLength);
auto [errCode, softBusErrCode] =
CommunicationProvider::GetInstance().SendData(pi, destination, dataInfo, totalLength);
if (errCode == Status::RATE_LIMIT) {
ZLOGD("commProvider_ opening session, status:%{public}d.", static_cast<int>(errCode));
ZLOGD("commProvider_ opening session, status:%{public}d.", static_cast<int>(softBusErrCode));
return DBStatus::RATE_LIMIT;
}
if (errCode != Status::SUCCESS) {
ZLOGE("commProvider_ SendData Fail.");
return DBStatus::DB_ERROR;
ZLOGE("commProvider_ SendData Fail. code:%{public}d", softBusErrCode);
if (softBusErrCode == 0) {
return DBStatus::DB_ERROR;
}
return static_cast<DBStatus>(softBusErrCode);
}
return DBStatus::OK;
}
@ -221,7 +225,7 @@ void ProcessCommunicatorImpl::OnDeviceChanged(const DeviceInfo &info, const Devi
onDeviceChangeHandler_(devInfo, (type == DeviceChangeType::DEVICE_ONLINE));
}
void ProcessCommunicatorImpl::OnSessionReady(const DeviceInfo &info) const
void ProcessCommunicatorImpl::OnSessionReady(const DeviceInfo &info, int32_t errCode) const
{
std::lock_guard<decltype(sessionMutex_)> lock(sessionMutex_);
if (sessionListener_ == nullptr) {
@ -229,7 +233,7 @@ void ProcessCommunicatorImpl::OnSessionReady(const DeviceInfo &info) const
}
DeviceInfos devInfos;
devInfos.identifier = info.uuid;
sessionListener_(devInfos);
sessionListener_(devInfos, errCode);
}
std::shared_ptr<ExtendHeaderHandle> ProcessCommunicatorImpl::GetExtendHeaderHandle(const ExtendInfo &info)

View File

@ -52,8 +52,8 @@ public:
Status StopWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo);
// Send data to other device, function will be called back after sent to notify send result.
Status SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo, uint32_t length,
const MessageInfo &info);
std::pair<Status, int32_t> SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
const DataInfo &dataInfo, uint32_t length, const MessageInfo &info);
bool IsSameStartedOnPeer(const struct PipeInfo &pipeInfo, const struct DeviceId &peer);
@ -83,7 +83,6 @@ public:
void OnDeviceChanged(const AppDistributedKv::DeviceInfo &info,
const AppDistributedKv::DeviceChangeType &type) const override;
private:
using Time = std::chrono::steady_clock::time_point;
using Duration = std::chrono::steady_clock::duration;
@ -95,6 +94,7 @@ private:
void Reuse(const PipeInfo &pipeInfo, const DeviceId &deviceId,
uint32_t qosType, std::shared_ptr<SoftBusClient> &conn);
void GetExpireTime(std::shared_ptr<SoftBusClient> &conn);
std::pair<Status, int32_t> OpenConnect(const std::shared_ptr<SoftBusClient> &conn, const DeviceId &deviceId);
static constexpr const char *PKG_NAME = "distributeddata-default";
static constexpr Time INVALID_NEXT = std::chrono::steady_clock::time_point::max();
static constexpr uint32_t QOS_COUNT = 3;

View File

@ -189,8 +189,8 @@ void SoftBusAdapter::GetExpireTime(std::shared_ptr<SoftBusClient> &conn)
}
}
Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
uint32_t length, const MessageInfo &info)
std::pair<Status, int32_t> SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
const DataInfo &dataInfo, uint32_t length, const MessageInfo &info)
{
std::shared_ptr<SoftBusClient> conn;
bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId.deviceId);
@ -218,29 +218,35 @@ Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &device
Reuse(pipeInfo, deviceId, qosType, conn);
}
if (conn == nullptr) {
return Status::ERROR;
return std::make_pair(Status::ERROR, 0);
}
auto status = conn->CheckStatus();
if (status == Status::RATE_LIMIT) {
return Status::RATE_LIMIT;
return std::make_pair(Status::RATE_LIMIT, 0);
}
if (status != Status::SUCCESS) {
auto task = [this, connect = std::weak_ptr<SoftBusClient>(conn)]() {
auto conn = connect.lock();
if (conn != nullptr) {
conn->OpenConnect(&clientListener_);
}
};
auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId.deviceId).networkId;
ConnectManager::GetInstance()->ApplyConnect(networkId, task);
return Status::RATE_LIMIT;
return OpenConnect(conn, deviceId);
}
status = conn->SendData(dataInfo, &clientListener_);
if ((status != Status::NETWORK_ERROR) && (status != Status::RATE_LIMIT)) {
GetExpireTime(conn);
}
return status;
auto errCode = conn->GetSoftBusError();
return std::make_pair(status, errCode);
}
std::pair<Status, int32_t> SoftBusAdapter::OpenConnect(const std::shared_ptr<SoftBusClient> &conn,
const DeviceId &deviceId)
{
auto task = [this, connect = std::weak_ptr<SoftBusClient>(conn)]() {
auto conn = connect.lock();
if (conn != nullptr) {
conn->OpenConnect(&clientListener_);
}
};
auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId.deviceId).networkId;
ConnectManager::GetInstance()->ApplyConnect(networkId, task);
return std::make_pair(Status::RATE_LIMIT, 0);
}
void SoftBusAdapter::StartCloseSessionTask(const std::string &deviceId)

View File

@ -75,12 +75,20 @@ Status SoftBusClient::SendData(const DataInfo &dataInfo, const ISocketListener *
if (ret != SOFTBUS_OK) {
expireTime_ = std::chrono::steady_clock::now();
ZLOGE("send data to socket%{public}d failed, ret:%{public}d.", socket_, ret);
softBusError_ = ret;
return Status::ERROR;
}
softBusError_ = 0;
expireTime_ = CalcExpireTime();
return Status::SUCCESS;
}
int32_t SoftBusClient::GetSoftBusError()
{
std::lock_guard<std::mutex> lock(mutex_);
return softBusError_;
}
Status SoftBusClient::OpenConnect(const ISocketListener *listener)
{
std::lock_guard<std::mutex> lock(mutex_);
@ -114,10 +122,8 @@ Status SoftBusClient::OpenConnect(const ISocketListener *listener)
}
ZLOGI("Bind Start, device:%{public}s socket:%{public}d type:%{public}u",
KvStoreUtils::ToBeAnonymous(client->device_.deviceId).c_str(), clientSocket, type);
auto status = client->Open(clientSocket, QOS_INFOS[type % QOS_BUTT], listener);
if (status == Status::SUCCESS) {
Context::GetInstance().NotifySessionReady(client->device_.deviceId);
}
int32_t status = client->Open(clientSocket, QOS_INFOS[type % QOS_BUTT], listener);
Context::GetInstance().NotifySessionReady(client->device_.deviceId, status);
client->isOpening_.store(false);
};
Context::GetInstance().GetThreadPool()->Execute(task);
@ -138,7 +144,7 @@ Status SoftBusClient::CheckStatus()
return Status::ERROR;
}
Status SoftBusClient::Open(int32_t socket, const QosTV qos[], const ISocketListener *listener)
int32_t SoftBusClient::Open(int32_t socket, const QosTV qos[], const ISocketListener *listener)
{
int32_t status = ::Bind(socket, qos, QOS_COUNT, listener);
ZLOGI("Bind %{public}s,session:%{public}s,socketId:%{public}d",
@ -148,14 +154,15 @@ Status SoftBusClient::Open(int32_t socket, const QosTV qos[], const ISocketListe
ZLOGE("[Bind] device:%{public}s socket failed, session:%{public}s,result:%{public}d",
KvStoreUtils::ToBeAnonymous(device_.deviceId).c_str(), pipe_.pipeId.c_str(), status);
::Shutdown(socket);
return Status::NETWORK_ERROR;
return status;
}
UpdateExpireTime();
uint32_t mtu = 0;
std::tie(status, mtu) = GetMtu(socket);
if (status != SOFTBUS_OK) {
ZLOGE("GetMtu failed, session:%{public}s, socket:%{public}d", pipe_.pipeId.c_str(), socket_);
return Status::NETWORK_ERROR;
ZLOGE("GetMtu failed, session:%{public}s, socket:%{public}d, status:%{public}d", pipe_.pipeId.c_str(), socket_,
status);
return status;
}
{
std::lock_guard<std::mutex> lock(mutex_);
@ -166,7 +173,7 @@ Status SoftBusClient::Open(int32_t socket, const QosTV qos[], const ISocketListe
ZLOGI("open %{public}s, session:%{public}s success, socket:%{public}d",
KvStoreUtils::ToBeAnonymous(device_.deviceId).c_str(), pipe_.pipeId.c_str(), socket_);
ConnectManager::GetInstance()->OnSessionOpen(DmAdapter::GetInstance().GetDeviceInfo(device_.deviceId).networkId);
return Status::SUCCESS;
return status;
}
SoftBusClient::Time SoftBusClient::GetExpireTime() const

View File

@ -48,11 +48,12 @@ public:
int32_t GetSocket() const;
uint32_t GetQoSType() const;
void UpdateExpireTime();
int32_t GetSoftBusError();
bool needRemove = false;
bool isReuse = false;
private:
Status Open(int32_t socket, const QosTV qos[], const ISocketListener *listener);
int32_t Open(int32_t socket, const QosTV qos[], const ISocketListener *listener);
std::pair<int32_t, uint32_t> GetMtu(int32_t socket);
Time CalcExpireTime() const;
@ -85,6 +86,7 @@ private:
int32_t socket_ = INVALID_SOCKET_ID;
int32_t bindState_ = -1;
int32_t softBusError_ = 0;
};
} // namespace OHOS::AppDistributedKv

View File

@ -133,8 +133,8 @@ HWTEST_F(CommunicationProviderImplTest, CommunicationProvider005, TestSize.Level
const uint8_t *t = reinterpret_cast<const uint8_t*>(content.c_str());
DeviceId di17 = {"127.0.0.2"};
DataInfo data = { const_cast<uint8_t *>(t), static_cast<uint32_t>(content.length())};
Status status = CommunicationProvider::GetInstance().SendData(id17, di17, data, 0);
EXPECT_NE(status, Status::SUCCESS);
auto status = CommunicationProvider::GetInstance().SendData(id17, di17, data, 0);
EXPECT_NE(status.first, Status::SUCCESS);
CommunicationProvider::GetInstance().StopWatchDataChange(dataListener17, id17);
CommunicationProvider::GetInstance().Stop(id17);
delete dataListener17;
@ -234,8 +234,8 @@ HWTEST_F(CommunicationProviderImplTest, CommunicationProvider011, TestSize.Level
const uint8_t *t = reinterpret_cast<const uint8_t*>(content.c_str());
DeviceId di = {"DeviceId"};
DataInfo data = { const_cast<uint8_t *>(t), static_cast<uint32_t>(content.length())};
Status status = CommunicationProvider::GetInstance().SendData(id, di, data, 0);
EXPECT_EQ(status, Status::ERROR);
auto status = CommunicationProvider::GetInstance().SendData(id, di, data, 0);
EXPECT_EQ(status.first, Status::ERROR);
CommunicationProvider::GetInstance().StopWatchDataChange(dataListener, id);
CommunicationProvider::GetInstance().Stop(id);
delete dataListener;
@ -259,8 +259,8 @@ HWTEST_F(CommunicationProviderImplTest, CommunicationProvider012, TestSize.Level
const uint8_t *t = reinterpret_cast<const uint8_t*>(content.c_str());
DeviceId di = {""};
DataInfo data = { const_cast<uint8_t *>(t), static_cast<uint32_t>(content.length())};
Status status = CommunicationProvider::GetInstance().SendData(id, di, data, 0);
EXPECT_EQ(status, Status::ERROR);
auto status = CommunicationProvider::GetInstance().SendData(id, di, data, 0);
EXPECT_EQ(status.first, Status::ERROR);
CommunicationProvider::GetInstance().StopWatchDataChange(dataListener, id);
CommunicationProvider::GetInstance().Stop(id);
delete dataListener;
@ -282,8 +282,8 @@ HWTEST_F(CommunicationProviderImplTest, CommunicationProvider013, TestSize.Level
CommunicationProvider::GetInstance().Start(id);
DeviceId di = {"DeviceId"};
DataInfo data = {nullptr, 0};
Status status = CommunicationProvider::GetInstance().SendData(id, di, data, 0);
EXPECT_EQ(status, Status::ERROR);
auto status = CommunicationProvider::GetInstance().SendData(id, di, data, 0);
EXPECT_EQ(status.first, Status::ERROR);
CommunicationProvider::GetInstance().StopWatchDataChange(dataListener, id);
CommunicationProvider::GetInstance().Stop(id);
delete dataListener;
@ -307,8 +307,8 @@ HWTEST_F(CommunicationProviderImplTest, CommunicationProvider014, TestSize.Level
const uint8_t *t = reinterpret_cast<const uint8_t*>(content.c_str());
DeviceId di = {"DeviceId"};
DataInfo data = { const_cast<uint8_t *>(t), static_cast<uint32_t>(content.length())};
Status status = CommunicationProvider::GetInstance().SendData(id, di, data, 0);
EXPECT_EQ(status, Status::ERROR);
auto status = CommunicationProvider::GetInstance().SendData(id, di, data, 0);
EXPECT_EQ(status.first, Status::ERROR);
CommunicationProvider::GetInstance().StopWatchDataChange(dataListener, id);
CommunicationProvider::GetInstance().Stop(id);
delete dataListener;

View File

@ -155,8 +155,8 @@ HWTEST_F(SoftbusAdapterStandardTest, SendData, TestSize.Level1)
const uint8_t *t = reinterpret_cast<const uint8_t*>(content.c_str());
DeviceId di = {"DeviceId"};
DataInfo data = { const_cast<uint8_t *>(t), static_cast<uint32_t>(content.length())};
Status status = SoftBusAdapter::GetInstance()->SendData(id, di, data, 11, { MessageType::DEFAULT });
EXPECT_NE(status, Status::SUCCESS);
auto status = SoftBusAdapter::GetInstance()->SendData(id, di, data, 11, { MessageType::DEFAULT });
EXPECT_NE(status.first, Status::SUCCESS);
SoftBusAdapter::GetInstance()->StopWatchDataChange(dataListener, id);
delete dataListener;
}

View File

@ -32,7 +32,7 @@ public:
{
return ChangeLevelType::HIGH;
}
API_EXPORT virtual void OnSessionReady(const DeviceInfo &info) const {}
API_EXPORT virtual void OnSessionReady(const DeviceInfo &info, int32_t errCode) const {}
};
} // namespace AppDistributedKv
} // namespace OHOS

View File

@ -45,8 +45,9 @@ public:
virtual Status StopWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo) = 0;
// Send data to other device, function will be called back after sent to notify send result
virtual Status SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
uint32_t totalLength, const MessageInfo &info = { MessageType::DEFAULT }) = 0;
virtual std::pair<Status, int32_t> SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
const DataInfo &dataInfo, uint32_t totalLength,
const MessageInfo &info = { MessageType::DEFAULT }) = 0;
// start one server to listen data from other devices;
virtual Status Start(const PipeInfo &pipeInfo) = 0;

View File

@ -36,7 +36,7 @@ public:
std::shared_ptr<ExecutorPool> GetThreadPool();
Status RegSessionListener(const DevChangeListener *observer);
Status UnRegSessionListener(const DevChangeListener *observer);
void NotifySessionReady(const std::string &deviceId);
void NotifySessionReady(const std::string &deviceId, int32_t errCode);
void NotifySessionClose(const std::string &deviceId);
void SetSessionListener(const OnCloseAble &closeAbleCallback);
bool IsSessionReady(const std::string &deviceId);

View File

@ -57,7 +57,7 @@ public:
std::vector<DeviceInfos> GetRemoteOnlineDeviceInfosList() override;
bool IsSameProcessLabelStartedOnPeerDevice(const DeviceInfos &peerDevInfo) override;
void OnDeviceChanged(const DeviceInfo &info, const DeviceChangeType &type) const override;
void OnSessionReady(const DeviceInfo &info) const override;
void OnSessionReady(const DeviceInfo &info, int32_t errCode) const override;
API_EXPORT std::shared_ptr<DistributedDB::ExtendHeaderHandle> GetExtendHeaderHandle(
const DistributedDB::ExtendInfo &info) override;

View File

@ -17,9 +17,14 @@
#include "bundle_checker.h"
#include <memory>
#include "accesstoken_kit.h"
#include "bundlemgr/bundle_mgr_proxy.h"
#include "hap_token_info.h"
#include "ipc_skeleton.h"
#include "iservice_registry.h"
#include "log_print.h"
#include "system_ability_definition.h"
#include "utils/crypto.h"
namespace OHOS {
namespace DistributedData {
using namespace Security::AccessToken;
@ -56,27 +61,47 @@ bool BundleChecker::SetSwitchesInfo(const CheckerManager::Switches &switches)
return true;
}
std::string BundleChecker::GetBundleAppId(const CheckerManager::StoreInfo &info)
{
auto samgrProxy = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
if (samgrProxy == nullptr) {
ZLOGE("Failed to get system ability mgr.");
return "";
}
auto bundleMgrProxy = samgrProxy->GetSystemAbility(BUNDLE_MGR_SERVICE_SYS_ABILITY_ID);
if (bundleMgrProxy == nullptr) {
ZLOGE("Failed to Get BMS SA.");
return "";
}
auto bundleManager = iface_cast<AppExecFwk::IBundleMgr>(bundleMgrProxy);
if (bundleManager == nullptr) {
ZLOGE("Failed to get bundle manager");
return "";
}
int32_t userId = info.uid / OHOS::AppExecFwk::Constants::BASE_USER_RANGE;
std::string appId = bundleManager->GetAppIdByBundleName(info.bundleName, userId);
if (appId.empty()) {
ZLOGE("GetAppIdByBundleName failed appId:%{public}s, bundleName:%{public}s, uid:%{public}d",
appId.c_str(), info.bundleName.c_str(), userId);
}
return appId;
}
std::string BundleChecker::GetAppId(const CheckerManager::StoreInfo &info)
{
if (AccessTokenKit::GetTokenTypeFlag(info.tokenId) != TOKEN_HAP) {
return "";
}
HapTokenInfo tokenInfo;
auto result = AccessTokenKit::GetHapTokenInfo(info.tokenId, tokenInfo);
if (result != RET_SUCCESS) {
ZLOGE("token:0x%{public}x, result:%{public}d", info.tokenId, result);
return "";
}
if (!info.bundleName.empty() && tokenInfo.bundleName != info.bundleName) {
ZLOGE("bundlename:%{public}s <-> %{public}s", info.bundleName.c_str(), tokenInfo.bundleName.c_str());
auto appId = GetBundleAppId(info);
if (appId.empty()) {
return "";
}
auto it = trusts_.find(info.bundleName);
if (it != trusts_.end() && (it->second == tokenInfo.appID)) {
if (it != trusts_.end() && (it->second == appId)) {
return info.bundleName;
}
ZLOGD("bundleName:%{public}s, appId:%{public}s", info.bundleName.c_str(), tokenInfo.appID.c_str());
return Crypto::Sha256(tokenInfo.appID);
ZLOGD("bundleName:%{public}s, appId:%{public}s", info.bundleName.c_str(), appId.c_str());
return Crypto::Sha256(appId);
}
bool BundleChecker::IsValid(const CheckerManager::StoreInfo &info)
@ -98,18 +123,12 @@ bool BundleChecker::IsDistrust(const CheckerManager::StoreInfo &info)
if (AccessTokenKit::GetTokenTypeFlag(info.tokenId) != TOKEN_HAP) {
return false;
}
HapTokenInfo tokenInfo;
auto result = AccessTokenKit::GetHapTokenInfo(info.tokenId, tokenInfo);
if (result != RET_SUCCESS) {
ZLOGE("token:0x%{public}x, result:%{public}d", info.tokenId, result);
return false;
}
if (!info.bundleName.empty() && tokenInfo.bundleName != info.bundleName) {
ZLOGE("bundlename:%{public}s <-> %{public}s", info.bundleName.c_str(), tokenInfo.bundleName.c_str());
auto appId = GetBundleAppId(info);
if (appId.empty()) {
return false;
}
auto it = distrusts_.find(info.bundleName);
if (it != distrusts_.end() && (it->second == tokenInfo.appID)) {
if (it != distrusts_.end() && (it->second == appId)) {
return true;
}
return false;

View File

@ -44,6 +44,7 @@ private:
std::map<std::string, std::string> switches_;
std::vector<CheckerManager::StoreInfo> dynamicStores_;
std::vector<CheckerManager::StoreInfo> staticStores_;
std::string GetBundleAppId(const CheckerManager::StoreInfo &info);
};
} // namespace DistributedData
} // namespace OHOS

View File

@ -34,8 +34,9 @@ void KvStoreDeviceListener::OnDeviceChanged(
ZLOGI("device is %{public}d", type);
}
void KvStoreDeviceListener::OnSessionReady(const AppDistributedKv::DeviceInfo &info) const
void KvStoreDeviceListener::OnSessionReady(const AppDistributedKv::DeviceInfo &info, int32_t errCode) const
{
(void)errCode;
kvStoreDataService_.OnSessionReady(info);
}

View File

@ -26,7 +26,7 @@ public:
void OnDeviceChanged(
const AppDistributedKv::DeviceInfo &info, const AppDistributedKv::DeviceChangeType &type) const override;
AppDistributedKv::ChangeLevelType GetChangeLevelType() const override;
void OnSessionReady(const AppDistributedKv::DeviceInfo &info) const override;
void OnSessionReady(const AppDistributedKv::DeviceInfo &info, int32_t errCode = 0) const override;
private:
KvStoreDataService &kvStoreDataService_;

View File

@ -78,15 +78,14 @@ DistributedDB::DBStatus RouteHeadHandlerImpl::GetHeadDataSize(uint32_t &headSize
ZLOGI("meta data permitted");
return DistributedDB::OK;
}
auto devInfo = DmAdapter::GetInstance().GetDeviceInfo(session_.targetDeviceId);
if (devInfo.osType != OH_OS_TYPE) {
ZLOGD("devicdId:%{public}s is not oh type",
Anonymous::Change(session_.targetDeviceId).c_str());
return DistributedDB::OK;
}
bool flag = false;
auto peerCap = UpgradeManager::GetInstance().GetCapability(session_.targetDeviceId, flag);
auto devInfo = DmAdapter::GetInstance().GetDeviceInfo(session_.targetDeviceId);
if (devInfo.osType != OH_OS_TYPE && devInfo.deviceType ==
static_cast<uint32_t>(DistributedHardware::DmDeviceType::DEVICE_TYPE_CAR)) {
ZLOGI("type car set version. devicdId:%{public}s", Anonymous::Change(session_.targetDeviceId).c_str());
flag = true;
peerCap.version = CapMetaData::CURRENT_VERSION;
}
if (!flag) {
ZLOGI("get peer cap failed");
return DistributedDB::DB_ERROR;

View File

@ -66,6 +66,7 @@ Session SessionManager::GetSession(const SessionPoint &from, const std::string &
return session;
}
for (const auto &user : users) {
aclParams.accCallee.userId = user.id;
auto [isPermitted, isSameAccount] = AuthDelegate::GetInstance()->CheckAccess(from.userId, user.id,
targetDeviceId, aclParams);
if (isPermitted) {
@ -78,7 +79,7 @@ Session SessionManager::GetSession(const SessionPoint &from, const std::string &
}
}
}
ZLOGD("access to peer user:%{public}d", session.targetUserIds[0]);
ZLOGD("access to peer users:%{public}s", DistributedData::Serializable::Marshall(session.targetUserIds).c_str());
return session;
}

View File

@ -153,25 +153,6 @@ HWTEST_F(CheckerManagerTest, SystemCheckerIVI, TestSize.Level0)
ASSERT_TRUE(CheckerManager::GetInstance().IsValid(info));
}
/**
* @tc.name: BundleChecker
* @tc.desc: checker the bundle name of the bundle abilities.
* @tc.type: FUNC
* @tc.require:
* @tc.author: Sven Wang
*/
HWTEST_F(CheckerManagerTest, BundleChecker, TestSize.Level0)
{
CheckerManager::StoreInfo storeInfo;
storeInfo.uid = 2000000;
storeInfo.tokenId = AccessTokenKit::GetHapTokenID(100, "ohos.test.demo", 0);
storeInfo.bundleName = "ohos.test.demo";
HapTokenInfo tokenInfo;
AccessTokenKit::GetHapTokenInfo(storeInfo.tokenId, tokenInfo);
ASSERT_EQ(Crypto::Sha256(tokenInfo.appID), CheckerManager::GetInstance().GetAppId(storeInfo));
ASSERT_TRUE(CheckerManager::GetInstance().IsValid(storeInfo));
}
/**
* @tc.name: IsDynamic
* @tc.desc: checker data type.

View File

@ -196,6 +196,7 @@ int32_t CloudServiceImpl::DoClean(const CloudInfo &cloudInfo, const std::map<std
syncManager_.StopCloudSync(cloudInfo.user);
for (const auto &[bundle, action] : actions) {
if (!cloudInfo.Exist(bundle)) {
ZLOGW("user:%{public}d, bundleName:%{public}s is not exist", cloudInfo.user, bundle.c_str());
continue;
}
SchemaMeta schemaMeta;

View File

@ -27,6 +27,7 @@
#include "metadata/store_meta_data.h"
#include "result_set.h"
#include "serializable/serializable.h"
#include "value_object.h"
namespace OHOS::DataShare {
class DBDelegate {
@ -40,6 +41,7 @@ public:
virtual std::string Query(
const std::string &sql, const std::vector<std::string> &selectionArgs = std::vector<std::string>()) = 0;
virtual std::shared_ptr<NativeRdb::ResultSet> QuerySql(const std::string &sql) = 0;
virtual std::pair<int, int64_t> UpdateSql(const std::string &sql) = 0;
virtual bool IsInvalid() = 0;
static void SetExecutorPool(std::shared_ptr<ExecutorPool> executor);
static void EraseStoreCache(const int32_t tokenId);

View File

@ -42,7 +42,7 @@ const char* g_backupFiles[] = {
const char* BACKUP_SUFFIX = ".backup";
// If isBackUp is true, remove db backup files. Otherwise remove source db files.
void KvDelegate::RemoveDbFile(bool isBackUp)
void KvDelegate::RemoveDbFile(bool isBackUp) const
{
for (auto &fileName: g_backupFiles) {
std::string dbPath = path_ + "/" + fileName;

View File

@ -46,7 +46,7 @@ private:
bool RestoreIfNeed(int32_t dbStatus);
void Backup();
void Restore();
void RemoveDbFile(bool isBackUp);
void RemoveDbFile(bool isBackUp) const;
bool CopyFile(bool isBackup);
std::recursive_mutex mutex_;
std::string path_;

View File

@ -90,15 +90,10 @@ std::pair<int, RdbStoreConfig> RdbDelegate::GetConfig(const DistributedData::Sto
}
RdbDelegate::RdbDelegate(const DistributedData::StoreMetaData &meta, int version,
bool registerFunction, const std::string &extUriData, const std::string &backup)
bool registerFunction, const std::string &extUri, const std::string &backup)
: tokenId_(meta.tokenId), bundleName_(meta.bundleName), storeName_(meta.storeId),
haMode_(meta.haMode), extUri_(extUri), backup_(backup)
{
tokenId_ = meta.tokenId;
bundleName_ = meta.bundleName;
storeName_ = meta.storeId;
extUri_ = extUriData;
haMode_ = meta.haMode;
backup_ = backup;
auto [err, config] = GetConfig(meta, registerFunction);
if (err != E_OK) {
ZLOGW("Get rdbConfig failed, errCode is %{public}d, dir is %{public}s", err,
@ -290,6 +285,22 @@ std::shared_ptr<NativeRdb::ResultSet> RdbDelegate::QuerySql(const std::string &s
return resultSet;
}
std::pair<int, int64_t> RdbDelegate::UpdateSql(const std::string &sql)
{
if (store_ == nullptr) {
ZLOGE("store is null");
return std::make_pair(E_ERROR, 0);
}
auto[ret, outValue] = store_->Execute(sql);
if (ret != E_OK) {
ZLOGE("execute update sql failed, err:%{public}d", ret);
return std::make_pair(ret, 0);
}
int64_t rowCount = 0;
outValue.GetLong(rowCount);
return std::make_pair(ret, rowCount);
}
bool RdbDelegate::IsInvalid()
{
return store_ == nullptr;

View File

@ -38,6 +38,7 @@ public:
const int32_t callingPid) override;
std::string Query(const std::string &sql, const std::vector<std::string> &selectionArgs) override;
std::shared_ptr<NativeRdb::ResultSet> QuerySql(const std::string &sql) override;
std::pair<int, int64_t> UpdateSql(const std::string &sql) override;
bool IsInvalid() override;
std::pair<int64_t, int64_t> InsertEx(const std::string &tableName,
const DataShareValuesBucket &valuesBucket) override;

View File

@ -91,7 +91,7 @@ void SchedulerManager::DestoryTimerTask(int64_t timerId)
void SchedulerManager::ResetTimerTask(int64_t timerId, int64_t reminderTime)
{
TimeServiceClient::GetInstance()->StopTimer(timerId);
// This start also means reset, new one will replace old one
TimeServiceClient::GetInstance()->StartTimer(timerId, static_cast<uint64_t>(reminderTime));
}

View File

@ -18,18 +18,20 @@
namespace OHOS::DataShare {
bool TemplateNode::Marshal(DistributedData::Serializable::json &node) const
{
bool ret = SetValue(node[GET_NAME(predicates)], predicates);
bool ret = SetValue(node[GET_NAME(update)], update);
ret = SetValue(node[GET_NAME(predicates)], predicates);
ret = ret && SetValue(node[GET_NAME(scheduler)], scheduler);
return ret;
}
bool TemplateNode::Unmarshal(const DistributedData::Serializable::json &node)
{
bool ret = GetValue(node, GET_NAME(predicates), predicates);
bool ret = GetValue(node, GET_NAME(update), update);
ret = GetValue(node, GET_NAME(predicates), predicates);
return ret && GetValue(node, GET_NAME(scheduler), scheduler);
}
TemplateNode::TemplateNode(const Template &tpl) : scheduler(tpl.scheduler_)
TemplateNode::TemplateNode(const Template &tpl) : update(tpl.update_), scheduler(tpl.scheduler_)
{
for (auto &item:tpl.predicates_) {
predicates.emplace_back(item.key_, item.selectSql_);
@ -42,7 +44,7 @@ Template TemplateNode::ToTemplate() const
for (const auto &predicate: predicates) {
nodes.emplace_back(predicate.key, predicate.selectSql);
}
return Template(nodes, scheduler);
return Template(update, nodes, scheduler);
}
bool TemplateRootNode::Marshal(DistributedData::Serializable::json &node) const

View File

@ -36,6 +36,7 @@ struct TemplateNode final: public DistributedData::Serializable {
bool Unmarshal(const json &node) override;
Template ToTemplate() const;
private:
std::string update;
std::vector<PredicatesNode> predicates;
std::string scheduler;
};

View File

@ -15,13 +15,124 @@
#define LOG_TAG "ObserverProxy"
#include "data_share_obs_proxy.h"
#include "datashare_errno.h"
#include "itypes_util.h"
#include "datashare_itypes_utils.h"
#include "log_print.h"
namespace OHOS {
namespace DataShare {
static constexpr int REQUEST_CODE = 0;
int RdbObserverProxy::CreateAshmem(RdbChangeNode &changeNode)
{
OHOS::sptr<Ashmem> memory = Ashmem::CreateAshmem(ASHMEM_NAME, DATA_SIZE_ASHMEM_TRANSFER_LIMIT);
if (memory == nullptr) {
ZLOGE("failed to create Ashmem instance.");
return E_ERROR;
}
bool mapRet = memory->MapReadAndWriteAshmem();
if (!mapRet) {
ZLOGE("failed to map read and write ashmem, ret=%{public}d", mapRet);
memory->CloseAshmem();
return E_ERROR;
}
if (changeNode.memory_ != nullptr) {
ZLOGE(
"Unknown error: changeNode.memory_ should be null, but something is there %{public}p",
(void *)changeNode.memory_
);
return E_ERROR;
}
changeNode.memory_ = memory;
return E_OK;
}
int RdbObserverProxy::WriteAshmem(RdbChangeNode &changeNode, void *data, int len, int &offset)
{
if (changeNode.memory_ == nullptr) {
ZLOGE("changeNode memory is nullptr.");
return E_ERROR;
}
bool writeRet = changeNode.memory_->WriteToAshmem(data, len, offset);
if (!writeRet) {
ZLOGE("failed to write into ashmem, ret=%{public}d", writeRet);
changeNode.memory_->UnmapAshmem();
changeNode.memory_->CloseAshmem();
changeNode.memory_ = nullptr;
return E_ERROR;
}
offset += len;
return E_OK;
}
int RdbObserverProxy::SerializeDataIntoAshmem(RdbChangeNode &changeNode)
{
if (changeNode.memory_ == nullptr) {
ZLOGE("changeNode.memory_ is nullptr");
return E_ERROR;
}
// move data
// simple serialization: [vec_size(int32); str1_len(int32), str1; str2_len(int32), str2; ...],
// total byte size is recorded in changeNode.size
int offset = 0;
// 4 byte for length int
int intLen = 4;
int dataSize = changeNode.data_.size();
if (WriteAshmem(changeNode, (void *)&dataSize, intLen, offset) != E_OK) {
ZLOGE("failed to write data with len %{public}d, offset %{public}d.", intLen, offset);
return E_ERROR;
}
for (int i = 0; i < dataSize; i++) {
const char *str = changeNode.data_[i].c_str();
int strLen = changeNode.data_[i].length();
// write length int
if (WriteAshmem(changeNode, (void *)&strLen, intLen, offset) != E_OK) {
ZLOGE("failed to write data with index %{public}d, len %{public}d, offset %{public}d.", i, intLen, offset);
return E_ERROR;
}
// write str
if (WriteAshmem(changeNode, (void *)str, strLen, offset) != E_OK) {
ZLOGE("failed to write data with index %{public}d, len %{public}d, offset %{public}d.", i, strLen, offset);
return E_ERROR;
}
}
changeNode.size_ = offset;
return E_OK;
}
int RdbObserverProxy::PrepareRdbChangeNodeData(RdbChangeNode &changeNode)
{
// If data size is bigger than the limit, move it to the shared memory
// 4 byte for length int
int intByteLen = 4;
int size = intByteLen;
for (int i = 0; i < changeNode.data_.size(); i++) {
size += intByteLen;
size += changeNode.data_[i].length();
}
if (size > DATA_SIZE_ASHMEM_TRANSFER_LIMIT) {
ZLOGE("Data to write into ashmem is %{public}d bytes, over 10M.", size);
return E_ERROR;
}
if (size > DATA_SIZE_IPC_TRANSFER_LIMIT) {
ZLOGD("Data size is over 200k, transfer it by the shared memory");
if (RdbObserverProxy::CreateAshmem(changeNode) != E_OK) {
ZLOGE("failed to create ashmem.");
return E_ERROR;
}
if (RdbObserverProxy::SerializeDataIntoAshmem(changeNode) != E_OK) {
ZLOGE("failed to serialize data into ashmem.");
return E_ERROR;
}
// clear original data spot
changeNode.data_.clear();
changeNode.isSharedMemory_ = true;
ZLOGD("Preparation done. Data size: %{public}d", changeNode.size_);
}
return E_OK;
}
void RdbObserverProxy::OnChangeFromRdb(RdbChangeNode &changeNode)
{
MessageParcel parcel;
@ -29,6 +140,11 @@ void RdbObserverProxy::OnChangeFromRdb(RdbChangeNode &changeNode)
return;
}
if (RdbObserverProxy::PrepareRdbChangeNodeData(changeNode) != E_OK) {
ZLOGE("failed to prepare RdbChangeNode data.");
return;
}
if (!ITypesUtil::Marshal(parcel, changeNode)) {
ZLOGE("failed to WriteParcelable changeNode ");
return;

View File

@ -28,6 +28,10 @@ public:
void OnChangeFromRdb(RdbChangeNode &changeNode) override;
private:
int PrepareRdbChangeNodeData(RdbChangeNode &changeNode);
int CreateAshmem(RdbChangeNode &changeNode);
int WriteAshmem(RdbChangeNode &changeNode, void *data, int len, int &offset);
int SerializeDataIntoAshmem(RdbChangeNode &changeNode);
static inline BrokerDelegator<RdbObserverProxy> delegator_;
};

View File

@ -216,7 +216,7 @@ int32_t DataShareServiceImpl::AddTemplate(const std::string &uri, const int64_t
return templateStrategy_.Execute(context, [&uri, &tpltId, &tplt, &context]() -> int32_t {
auto result = TemplateManager::GetInstance().Add(
Key(uri, tpltId.subscriberId_, tpltId.bundleName_), context->currentUserId, tplt);
RdbSubscriberManager::GetInstance().Emit(context->uri, tpltId.subscriberId_, context);
RdbSubscriberManager::GetInstance().Emit(context->uri, tpltId.subscriberId_, tpltId.bundleName_, context);
return result;
});
}

View File

@ -117,7 +117,7 @@ int32_t DataShareServiceStub::OnAddTemplate(MessageParcel &data, MessageParcel &
std::string uri;
int64_t subscriberId;
Template tpl;
if (!ITypesUtil::Unmarshal(data, uri, subscriberId, tpl.predicates_, tpl.scheduler_)) {
if (!ITypesUtil::Unmarshal(data, uri, subscriberId, tpl.update_, tpl.predicates_, tpl.scheduler_)) {
ZLOGW("read device list failed.");
return -1;
}

View File

@ -85,7 +85,21 @@ bool Unmarshalling(PredicateTemplateNode &predicateTemplateNode, MessageParcel &
template<>
bool Marshalling(const RdbChangeNode &changeNode, MessageParcel &parcel)
{
return ITypesUtil::Marshal(parcel, changeNode.uri_, changeNode.templateId_, changeNode.data_);
bool firstPart = ITypesUtil::Marshal(
parcel, changeNode.uri_, changeNode.templateId_, changeNode.data_, changeNode.isSharedMemory_);
if (!firstPart) {
return false;
}
if (changeNode.isSharedMemory_) {
if (changeNode.memory_ == nullptr) {
ZLOGE("Used shared memory but ashmem is nullptr.");
return false;
}
if (!parcel.WriteAshmem(changeNode.memory_)) {
return false;
}
}
return ITypesUtil::Marshal(parcel, changeNode.size_);
}
template<>

View File

@ -343,6 +343,13 @@ int RdbSubscriberManager::Notify(const Key &key, int32_t userId, const std::vect
}
changeNode.data_.emplace_back("{\"" + predicate.key_ + "\":" + result + "}");
}
if (!tpl.update_.empty()) {
auto [errCode, rowCount] = delegate->UpdateSql(tpl.update_);
if (errCode != E_OK) {
ZLOGE("Update failed, err:%{public}d, %{public}s, %{public}" PRId64 ", %{public}s",
errCode, DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
}
}
ZLOGI("emit, valSize: %{public}zu, dataSize:%{public}zu, uri:%{public}s,",
val.size(), changeNode.data_.size(), DistributedData::Anonymous::Change(changeNode.uri_).c_str());
@ -359,7 +366,8 @@ void RdbSubscriberManager::Clear()
rdbCache_.Clear();
}
void RdbSubscriberManager::Emit(const std::string &uri, int64_t subscriberId, std::shared_ptr<Context> context)
void RdbSubscriberManager::Emit(const std::string &uri, int64_t subscriberId,
const std::string &bundleName, std::shared_ptr<Context> context)
{
if (!URIUtils::IsDataProxyURI(uri)) {
return;
@ -376,8 +384,9 @@ void RdbSubscriberManager::Emit(const std::string &uri, int64_t subscriberId, st
SetObserverNotifyOnEnabled(val);
return false;
});
SchedulerManager::GetInstance().Execute(
uri, context->currentUserId, context->calledSourceDir, context->version, context->calledBundleName);
Key executeKey(uri, subscriberId, bundleName);
SchedulerManager::GetInstance().Execute(executeKey, context->currentUserId,
context->calledSourceDir, context->version);
}
RdbSubscriberManager::ObserverNode::ObserverNode(const sptr<IDataProxyRdbObserver> &observer,
uint32_t firstCallerTokenId, uint32_t callerTokenId)

View File

@ -61,7 +61,8 @@ public:
void Delete(uint32_t callerTokenId);
int Disable(const Key &key, uint32_t firstCallerTokenId);
int Enable(const Key &key, std::shared_ptr<Context> context);
void Emit(const std::string &uri, int64_t subscriberId, std::shared_ptr<Context> context);
void Emit(const std::string &uri, int64_t subscriberId, const std::string &bundleName,
std::shared_ptr<Context> context);
void Emit(const std::string &uri, std::shared_ptr<Context> context);
void Emit(const std::string &uri, int32_t userId, DistributedData::StoreMetaData &metaData);
void EmitByKey(const Key &key, int32_t userId, const std::string &rdbPath, int version);

View File

@ -1089,7 +1089,7 @@ Status KVDBServiceImpl::DoSyncInOrder(
if (uuids.empty()) {
ZLOGW("no device seqId:0x%{public}" PRIx64 " remote:%{public}zu appId:%{public}s storeId:%{public}s",
info.seqId, info.devices.size(), meta.bundleName.c_str(), Anonymous::Change(meta.storeId).c_str());
return Status::ERROR;
return Status::DEVICE_NOT_ONLINE;
}
if (IsNeedMetaSync(meta, uuids)) {
auto recv = DeviceMatrix::GetInstance().GetRecvLevel(uuids[0],
@ -1237,8 +1237,14 @@ Status KVDBServiceImpl::DoComplete(const StoreMetaData &meta, const SyncInfo &in
SYNC_STORE_ID, Anonymous::Change(meta.storeId), SYNC_APP_ID, meta.bundleName, CONCURRENT_ID,
std::to_string(info.syncId), DATA_TYPE, meta.dataType);
std::map<std::string, Status> result;
for (auto &[key, status] : dbResult) {
result[key] = ConvertDbStatus(status);
if (AccessTokenKit::GetTokenTypeFlag(meta.tokenId) != TOKEN_HAP) {
for (auto &[key, status] : dbResult) {
result[key] = ConvertDbStatusNative(status);
}
} else {
for (auto &[key, status] : dbResult) {
result[key] = ConvertDbStatus(status);
}
}
for (const auto &device : info.devices) {
auto it = result.find(device);
@ -1261,6 +1267,18 @@ Status KVDBServiceImpl::DoComplete(const StoreMetaData &meta, const SyncInfo &in
return SUCCESS;
}
Status KVDBServiceImpl::ConvertDbStatusNative(DBStatus status)
{
auto innerStatus = static_cast<int32_t>(status);
if (innerStatus < 0) {
return static_cast<Status>(status);
} else if (status == DBStatus::COMM_FAILURE) {
return Status::DEVICE_NOT_ONLINE;
} else {
return ConvertDbStatus(status);
}
}
uint32_t KVDBServiceImpl::GetSyncDelayTime(uint32_t delay, const StoreId &storeId)
{
if (delay != 0) {
@ -1514,7 +1532,15 @@ Status KVDBServiceImpl::RemoveDeviceData(const AppId &appId, const StoreId &stor
if (device.empty()) {
ret = store->Clean({}, KVDBGeneralStore::NEARBY_DATA, "");
} else {
ret = store->Clean({ DMAdapter::GetInstance().ToUUID(device) }, KVDBGeneralStore::NEARBY_DATA, "");
auto uuid = DMAdapter::GetInstance().ToUUID(device);
if (uuid.empty()) {
auto tokenId = IPCSkeleton::GetCallingTokenID();
if (AccessTokenKit::GetTokenTypeFlag(tokenId) != TOKEN_HAP) {
ZLOGW("uuid convert empty! device:%{public}s", Anonymous::Change(device).c_str());
uuid = device;
}
}
ret = store->Clean({ uuid }, KVDBGeneralStore::NEARBY_DATA, "");
}
return ConvertGeneralErr(GeneralError(ret));
}

View File

@ -156,6 +156,7 @@ private:
StoreMetaData &meta, bool &isFindIdentifier);
int32_t DoTripleAutoLaunch(StoreMetaData &meta);
DistributedDB::SecurityOption ConvertSecurity(int securityLevel);
Status ConvertDbStatusNative(DBStatus status);
static Factory factory_;
ConcurrentMap<uint32_t, SyncAgent> syncAgents_;
std::shared_ptr<ExecutorPool> executors_;

View File

@ -970,6 +970,7 @@ ohos_unittest("DataShareServiceImplTest") {
"${data_service_path}/service/data_share/sys_event_subscriber.cpp",
"${data_service_path}/service/kvdb/user_delegate.cpp",
"${data_service_path}/service/permission/src/permit_delegate.cpp",
"data_share_obs_proxy_test.cpp",
"data_share_profile_config_test.cpp",
"data_share_service_impl_test.cpp",
"data_share_service_stub_test.cpp",

View File

@ -0,0 +1,318 @@
/*
* Copyright (c) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define LOG_TAG "DataShareObsProxyTest"
#include <gtest/gtest.h>
#include <unistd.h>
#include "data_share_obs_proxy.h"
#include "datashare_errno.h"
#include "log_print.h"
namespace OHOS::Test {
using namespace testing::ext;
using namespace OHOS::DataShare;
std::string BUNDLE_NAME = "ohos.datasharetest.demo";
constexpr int64_t TEST_SUB_ID = 100;
class DataShareObsProxyTest : public testing::Test {
public:
static void SetUpTestCase(void){};
static void TearDownTestCase(void){};
void SetUp(){};
void TearDown(){};
};
RdbChangeNode SampleRdbChangeNode()
{
TemplateId tplId;
tplId.subscriberId_ = TEST_SUB_ID;
tplId.bundleName_ = BUNDLE_NAME;
RdbChangeNode node;
node.uri_ = std::string("");
node.templateId_ = tplId;
node.data_ = std::vector<std::string>();
node.isSharedMemory_ = false;
node.memory_ = nullptr;
node.size_ = 0;
return node;
}
/**
* @tc.name: CreateAshmem
* @tc.desc: test CreateAshmem function
* @tc.type: FUNC
* @tc.require:
*/
HWTEST_F(DataShareObsProxyTest, CreateAshmem, TestSize.Level1)
{
ZLOGI("CreateAshmem starts");
RdbChangeNode node = SampleRdbChangeNode();
OHOS::sptr<IRemoteObject> fake = nullptr;
RdbObserverProxy proxy(fake);
int ret = proxy.CreateAshmem(node);
EXPECT_EQ(ret, DataShare::E_OK);
EXPECT_NE(node.memory_, nullptr);
ZLOGI("CreateAshmem ends");
}
/**
* @tc.name: WriteAshmem001
* @tc.desc: test WriteAshmem function. Write an int
* @tc.type: FUNC
* @tc.require:
*/
HWTEST_F(DataShareObsProxyTest, WriteAshmem001, TestSize.Level1)
{
ZLOGI("WriteAshmem001 starts");
RdbChangeNode node = SampleRdbChangeNode();
OHOS::sptr<IRemoteObject> fake = nullptr;
RdbObserverProxy proxy(fake);
int retCreate = proxy.CreateAshmem(node);
EXPECT_EQ(retCreate, DataShare::E_OK);
int len = 10;
int intLen = 4;
int offset = 0;
int ret = proxy.WriteAshmem(node, (void *)&len, intLen, offset);
EXPECT_EQ(ret, E_OK);
EXPECT_EQ(offset, intLen);
// read from the start
const void *read = node.memory_->ReadFromAshmem(intLen, 0);
EXPECT_NE(read, nullptr);
int lenRead = *reinterpret_cast<const int *>(read);
EXPECT_EQ(len, lenRead);
ZLOGI("WriteAshmem001 ends");
}
/**
* @tc.name: WriteAshmem002
* @tc.desc: test WriteAshmem function. Write a str
* @tc.type: FUNC
* @tc.require:
*/
HWTEST_F(DataShareObsProxyTest, WriteAshmem002, TestSize.Level1)
{
ZLOGI("WriteAshmem002 starts");
RdbChangeNode node = SampleRdbChangeNode();
OHOS::sptr<IRemoteObject> fake = nullptr;
RdbObserverProxy proxy(fake);
int retCreate = proxy.CreateAshmem(node);
EXPECT_EQ(retCreate, DataShare::E_OK);
std::string string("Hello World");
const char *str = string.c_str();
int len = string.length();
int offset = 0;
int ret = proxy.WriteAshmem(node, (void *)str, len, offset);
EXPECT_EQ(ret, E_OK);
EXPECT_EQ(offset, len);
// read from the start
const void *read = node.memory_->ReadFromAshmem(len, 0);
EXPECT_NE(read, nullptr);
const char *strRead = reinterpret_cast<const char *>(read);
std::string stringRead(strRead, len);
EXPECT_EQ(stringRead, string);
ZLOGI("WriteAshmem002 ends");
}
/**
* @tc.name: WriteAshmem003
* @tc.desc: test WriteAshmem function with error
* @tc.type: FUNC
* @tc.require:
*/
HWTEST_F(DataShareObsProxyTest, WriteAshmem003, TestSize.Level1)
{
ZLOGI("WriteAshmem003 starts");
RdbChangeNode node = SampleRdbChangeNode();
OHOS::sptr<IRemoteObject> fake = nullptr;
RdbObserverProxy proxy(fake);
OHOS::sptr<Ashmem> memory = Ashmem::CreateAshmem("WriteAshmem003", 2);
EXPECT_NE(memory, nullptr);
bool mapRet = memory->MapReadAndWriteAshmem();
ASSERT_TRUE(mapRet);
node.memory_ = memory;
int len = 10;
int offset = 0;
int ret = proxy.WriteAshmem(node, (void *)&len, 4, offset);
EXPECT_EQ(ret, E_ERROR);
ZLOGI("WriteAshmem003 ends");
}
/**
* @tc.name: SerializeDataIntoAshmem
* @tc.desc: test SerializeDataIntoAshmem function
* @tc.type: FUNC
* @tc.require:
*/
HWTEST_F(DataShareObsProxyTest, SerializeDataIntoAshmem, TestSize.Level1)
{
ZLOGI("SerializeDataIntoAshmem starts");
RdbChangeNode node = SampleRdbChangeNode();
OHOS::sptr<IRemoteObject> fake = nullptr;
RdbObserverProxy proxy(fake);
int retCreate = proxy.CreateAshmem(node);
EXPECT_EQ(retCreate, E_OK);
// Push three times
node.data_.push_back(BUNDLE_NAME);
node.data_.push_back(BUNDLE_NAME);
node.data_.push_back(BUNDLE_NAME);
int intLen = 4;
// item length size + (str length size + str length) * 3
int offset = intLen + (intLen + strlen(BUNDLE_NAME.c_str())) * 3;
int retSe = proxy.SerializeDataIntoAshmem(node);
EXPECT_EQ(retSe, E_OK);
EXPECT_EQ(node.size_, offset);
offset = 0;
const void *vecLenRead = node.memory_->ReadFromAshmem(intLen, offset);
EXPECT_NE(vecLenRead, nullptr);
int vecLen = *reinterpret_cast<const int *>(vecLenRead);
EXPECT_EQ(vecLen, 3);
offset += intLen;
// 3 strings in the vec
for (int i = 0; i < 3; i++) {
const void *strLenRead = node.memory_->ReadFromAshmem(intLen, offset);
EXPECT_NE(strLenRead, nullptr);
int strLen = *reinterpret_cast<const int *>(strLenRead);
EXPECT_EQ(strLen, BUNDLE_NAME.length());
offset += intLen;
const void *strRead = node.memory_->ReadFromAshmem(strLen, offset);
EXPECT_NE(strRead, nullptr);
const char *str = reinterpret_cast<const char *>(strRead);
std::string stringRead(str, strLen);
EXPECT_EQ(stringRead, BUNDLE_NAME);
offset += strLen;
}
ZLOGI("SerializeDataIntoAshmem ends");
}
/**
* @tc.name: SerializeDataIntoAshmem002
* @tc.desc: test SerializeDataIntoAshmem function
* @tc.type: FUNC
* @tc.require:
*/
HWTEST_F(DataShareObsProxyTest, SerializeDataIntoAshmem002, TestSize.Level1)
{
ZLOGI("SerializeDataIntoAshmem starts");
RdbChangeNode node = SampleRdbChangeNode();
OHOS::sptr<IRemoteObject> fake = nullptr;
RdbObserverProxy proxy(fake);
// Push three times
node.data_.push_back(BUNDLE_NAME);
node.data_.push_back(BUNDLE_NAME);
node.data_.push_back(BUNDLE_NAME);
// memory too small for vec length
OHOS::sptr<Ashmem> memory = Ashmem::CreateAshmem("SerializeDataIntoAshmem002", 2);
EXPECT_NE(memory, nullptr);
bool mapRet = memory->MapReadAndWriteAshmem();
ASSERT_TRUE(mapRet);
node.memory_ = memory;
int retSe = proxy.SerializeDataIntoAshmem(node);
EXPECT_EQ(retSe, E_ERROR);
EXPECT_EQ(node.size_, 0);
EXPECT_EQ(node.data_.size(), 3);
ASSERT_FALSE(node.isSharedMemory_);
// memory too small for string length
OHOS::sptr<Ashmem> memory2 = Ashmem::CreateAshmem("SerializeDataIntoAshmem002", 6);
EXPECT_NE(memory2, nullptr);
mapRet = memory2->MapReadAndWriteAshmem();
ASSERT_TRUE(mapRet);
node.memory_ = memory2;
retSe = proxy.SerializeDataIntoAshmem(node);
EXPECT_EQ(retSe, E_ERROR);
EXPECT_EQ(node.size_, 0);
EXPECT_EQ(node.data_.size(), 3);
ASSERT_FALSE(node.isSharedMemory_);
// memory too small for string
OHOS::sptr<Ashmem> memory3 = Ashmem::CreateAshmem("SerializeDataIntoAshmem002", 10);
EXPECT_NE(memory3, nullptr);
mapRet = memory3->MapReadAndWriteAshmem();
ASSERT_TRUE(mapRet);
node.memory_ = memory3;
retSe = proxy.SerializeDataIntoAshmem(node);
EXPECT_EQ(retSe, E_ERROR);
EXPECT_EQ(node.size_, 0);
EXPECT_EQ(node.data_.size(), 3);
ASSERT_FALSE(node.isSharedMemory_);
ZLOGI("SerializeDataIntoAshmem002 ends");
}
/**
* @tc.name: PreparationData
* @tc.desc: test PrepareRdbChangeNodeData function
* @tc.type: FUNC
* @tc.require:
*/
HWTEST_F(DataShareObsProxyTest, PreparationData, TestSize.Level1)
{
ZLOGI("PreparationData starts");
RdbChangeNode node = SampleRdbChangeNode();
OHOS::sptr<IRemoteObject> fake = nullptr;
RdbObserverProxy proxy(fake);
// Push three times, less than 200k
node.data_.push_back(BUNDLE_NAME);
node.data_.push_back(BUNDLE_NAME);
node.data_.push_back(BUNDLE_NAME);
int ret = proxy.PrepareRdbChangeNodeData(node);
EXPECT_EQ(ret, E_OK);
EXPECT_EQ(node.data_.size(), 3);
EXPECT_FALSE(node.isSharedMemory_);
// Try to fake a 200k data. BUNDLE_NAME is 23 byte long and 7587 BUNDLE_NAMEs is over 200k.
for (int i = 0; i < 7587; i++) {
node.data_.push_back(BUNDLE_NAME);
}
ret = proxy.PrepareRdbChangeNodeData(node);
EXPECT_EQ(ret, E_OK);
EXPECT_EQ(node.data_.size(), 0);
EXPECT_TRUE(node.isSharedMemory_);
// Try to fake data over 10M. Write data of such size should fail because it exceeds the limit.
for (int i = 0; i < 388362; i++) {
node.data_.push_back(BUNDLE_NAME);
}
ret = proxy.PrepareRdbChangeNodeData(node);
EXPECT_EQ(ret, E_ERROR);
ZLOGI("PreparationData ends");
}
} // namespace OHOS::Test

View File

@ -178,7 +178,7 @@ HWTEST_F(DataShareSubscriberManagersTest, Emit, TestSize.Level1)
{
auto context = std::make_shared<Context>(DATA_SHARE_URI_TEST);
RdbSubscriberManager::GetInstance().Emit(DATA_SHARE_URI_TEST, context);
RdbSubscriberManager::GetInstance().Emit(DATA_SHARE_URI_TEST, TEST_SUB_ID, context);
RdbSubscriberManager::GetInstance().Emit(DATA_SHARE_URI_TEST, TEST_SUB_ID, BUNDLE_NAME_TEST, context);
TemplateId tpltId;
tpltId.subscriberId_ = TEST_SUB_ID;
tpltId.bundleName_ = BUNDLE_NAME_TEST;

View File

@ -128,6 +128,8 @@ ohos_fuzztest("CloudServiceStubFuzzTest") {
"kv_store:distributeddata_inner",
"kv_store:distributeddata_mgr",
"relational_store:native_rdb",
"safwk:system_ability_fwk",
"samgr:samgr_proxy",
]
}

View File

@ -117,6 +117,8 @@ ohos_fuzztest("KvdbServiceStubFuzzTest") {
"ipc:ipc_core",
"kv_store:distributeddata_inner",
"kv_store:distributeddata_mgr",
"safwk:system_ability_fwk",
"samgr:samgr_proxy",
]
}