mirror of
https://gitee.com/openharmony/distributeddatamgr_datamgr_service
synced 2024-11-27 00:51:12 +00:00
commit
057558f833
@ -43,8 +43,8 @@ AppPipeHandler::AppPipeHandler(const PipeInfo &pipeInfo)
|
||||
softbusAdapter_ = SoftBusAdapter::GetInstance();
|
||||
}
|
||||
|
||||
Status AppPipeHandler::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
|
||||
uint32_t totalLength, const MessageInfo &info)
|
||||
std::pair<Status, int32_t> AppPipeHandler::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
|
||||
const DataInfo &dataInfo, uint32_t totalLength, const MessageInfo &info)
|
||||
{
|
||||
return softbusAdapter_->SendData(pipeInfo, deviceId, dataInfo, totalLength, info);
|
||||
}
|
||||
|
@ -42,7 +42,7 @@ public:
|
||||
// stop DataChangeListener to watch data change;
|
||||
Status StopWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo);
|
||||
// Send data to other device, function will be called back after sent to notify send result.
|
||||
Status SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
|
||||
std::pair<Status, int32_t> SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
|
||||
uint32_t totalLength, const MessageInfo &info);
|
||||
bool IsSameStartedOnPeer(const struct PipeInfo &pipeInfo, const struct DeviceId &peer);
|
||||
|
||||
|
@ -58,13 +58,13 @@ Status AppPipeMgr::StopWatchDataChange(const AppDataChangeListener *observer, co
|
||||
}
|
||||
|
||||
// Send data to other device, function will be called back after sent to notify send result.
|
||||
Status AppPipeMgr::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
|
||||
uint32_t totalLength, const MessageInfo &info)
|
||||
std::pair<Status, int32_t> AppPipeMgr::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
|
||||
const DataInfo &dataInfo, uint32_t totalLength, const MessageInfo &info)
|
||||
{
|
||||
if (dataInfo.length > DataBuffer::MAX_TRANSFER_SIZE || dataInfo.length == 0 || dataInfo.data == nullptr ||
|
||||
pipeInfo.pipeId.empty() || deviceId.deviceId.empty()) {
|
||||
ZLOGW("Input is invalid, maxSize:%u, current size:%u", DataBuffer::MAX_TRANSFER_SIZE, dataInfo.length);
|
||||
return Status::ERROR;
|
||||
return std::make_pair(Status::ERROR, 0);
|
||||
}
|
||||
ZLOGD("pipeInfo:%s ,size:%u, total length:%u", pipeInfo.pipeId.c_str(), dataInfo.length, totalLength);
|
||||
std::shared_ptr<AppPipeHandler> appPipeHandler;
|
||||
@ -73,7 +73,7 @@ Status AppPipeMgr::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
|
||||
auto it = dataBusMap_.find(pipeInfo.pipeId);
|
||||
if (it == dataBusMap_.end()) {
|
||||
ZLOGW("pipeInfo:%s not found", pipeInfo.pipeId.c_str());
|
||||
return Status::KEY_NOT_FOUND;
|
||||
return std::make_pair(Status::KEY_NOT_FOUND, 0);
|
||||
}
|
||||
appPipeHandler = it->second;
|
||||
}
|
||||
|
@ -36,7 +36,7 @@ public:
|
||||
Status StopWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo);
|
||||
|
||||
// Send data to other device, function will be called back after sent to notify send result.
|
||||
Status SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
|
||||
std::pair<Status, int32_t> SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
|
||||
uint32_t totalLength, const MessageInfo &info);
|
||||
// start server
|
||||
Status Start(const PipeInfo &pipeInfo);
|
||||
|
@ -48,8 +48,8 @@ Status CommunicationProviderImpl::StopWatchDataChange(const AppDataChangeListene
|
||||
return appPipeMgr_.StopWatchDataChange(observer, pipeInfo);
|
||||
}
|
||||
|
||||
Status CommunicationProviderImpl::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
|
||||
uint32_t totalLength, const MessageInfo &info)
|
||||
std::pair<Status, int32_t> CommunicationProviderImpl::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
|
||||
const DataInfo &dataInfo, uint32_t totalLength, const MessageInfo &info)
|
||||
{
|
||||
return appPipeMgr_.SendData(pipeInfo, deviceId, dataInfo, totalLength, info);
|
||||
}
|
||||
|
@ -36,7 +36,7 @@ public:
|
||||
Status StopWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo) override;
|
||||
|
||||
// Send data to other device, function will be called back after sent to notify send result.
|
||||
Status SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
|
||||
std::pair<Status, int32_t> SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
|
||||
uint32_t totalLength, const MessageInfo &info) override;
|
||||
|
||||
// start 1 server to listen data from other devices;
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include "communicator_context.h"
|
||||
#include "log_print.h"
|
||||
#include "kvstore_utils.h"
|
||||
#include "softbus_error_code.h"
|
||||
|
||||
namespace OHOS::DistributedData {
|
||||
using KvUtils = OHOS::DistributedKv::KvStoreUtils;
|
||||
@ -72,28 +73,32 @@ Status CommunicatorContext::UnRegSessionListener(const DevChangeListener *observ
|
||||
return Status::SUCCESS;
|
||||
}
|
||||
|
||||
void CommunicatorContext::NotifySessionReady(const std::string &deviceId)
|
||||
void CommunicatorContext::NotifySessionReady(const std::string &deviceId, int32_t errCode)
|
||||
{
|
||||
if (deviceId.empty()) {
|
||||
ZLOGE("deviceId empty");
|
||||
return;
|
||||
}
|
||||
devices_.Insert(deviceId, deviceId);
|
||||
if (errCode == SOFTBUS_OK) {
|
||||
devices_.Insert(deviceId, deviceId);
|
||||
}
|
||||
DeviceInfo devInfo;
|
||||
devInfo.uuid = deviceId;
|
||||
{
|
||||
std::lock_guard<decltype(mutex_)> lock(mutex_);
|
||||
for (const auto &observer : observers_) {
|
||||
if (observer != nullptr) {
|
||||
observer->OnSessionReady(devInfo);
|
||||
observer->OnSessionReady(devInfo, errCode);
|
||||
}
|
||||
}
|
||||
ZLOGI("Notify session begin, deviceId:%{public}s, observer count:%{public}zu",
|
||||
KvUtils::ToBeAnonymous(deviceId).c_str(), observers_.size());
|
||||
}
|
||||
std::lock_guard<decltype(sessionMutex_)> sessionLockGard(sessionMutex_);
|
||||
if (closeListener_) {
|
||||
closeListener_(deviceId);
|
||||
if (errCode == SOFTBUS_OK) {
|
||||
std::lock_guard<decltype(sessionMutex_)> sessionLockGard(sessionMutex_);
|
||||
if (closeListener_) {
|
||||
closeListener_(deviceId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -139,14 +139,18 @@ DBStatus ProcessCommunicatorImpl::SendData(const DeviceInfos &dstDevInfo, const
|
||||
const DataInfo dataInfo = { const_cast<uint8_t *>(data), length};
|
||||
DeviceId destination;
|
||||
destination.deviceId = dstDevInfo.identifier;
|
||||
Status errCode = CommunicationProvider::GetInstance().SendData(pi, destination, dataInfo, totalLength);
|
||||
auto [errCode, softBusErrCode] =
|
||||
CommunicationProvider::GetInstance().SendData(pi, destination, dataInfo, totalLength);
|
||||
if (errCode == Status::RATE_LIMIT) {
|
||||
ZLOGD("commProvider_ opening session, status:%{public}d.", static_cast<int>(errCode));
|
||||
ZLOGD("commProvider_ opening session, status:%{public}d.", static_cast<int>(softBusErrCode));
|
||||
return DBStatus::RATE_LIMIT;
|
||||
}
|
||||
if (errCode != Status::SUCCESS) {
|
||||
ZLOGE("commProvider_ SendData Fail.");
|
||||
return DBStatus::DB_ERROR;
|
||||
ZLOGE("commProvider_ SendData Fail. code:%{public}d", softBusErrCode);
|
||||
if (softBusErrCode == 0) {
|
||||
return DBStatus::DB_ERROR;
|
||||
}
|
||||
return static_cast<DBStatus>(softBusErrCode);
|
||||
}
|
||||
return DBStatus::OK;
|
||||
}
|
||||
@ -221,7 +225,7 @@ void ProcessCommunicatorImpl::OnDeviceChanged(const DeviceInfo &info, const Devi
|
||||
onDeviceChangeHandler_(devInfo, (type == DeviceChangeType::DEVICE_ONLINE));
|
||||
}
|
||||
|
||||
void ProcessCommunicatorImpl::OnSessionReady(const DeviceInfo &info) const
|
||||
void ProcessCommunicatorImpl::OnSessionReady(const DeviceInfo &info, int32_t errCode) const
|
||||
{
|
||||
std::lock_guard<decltype(sessionMutex_)> lock(sessionMutex_);
|
||||
if (sessionListener_ == nullptr) {
|
||||
@ -229,7 +233,7 @@ void ProcessCommunicatorImpl::OnSessionReady(const DeviceInfo &info) const
|
||||
}
|
||||
DeviceInfos devInfos;
|
||||
devInfos.identifier = info.uuid;
|
||||
sessionListener_(devInfos);
|
||||
sessionListener_(devInfos, errCode);
|
||||
}
|
||||
|
||||
std::shared_ptr<ExtendHeaderHandle> ProcessCommunicatorImpl::GetExtendHeaderHandle(const ExtendInfo &info)
|
||||
|
@ -52,8 +52,8 @@ public:
|
||||
Status StopWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo);
|
||||
|
||||
// Send data to other device, function will be called back after sent to notify send result.
|
||||
Status SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo, uint32_t length,
|
||||
const MessageInfo &info);
|
||||
std::pair<Status, int32_t> SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
|
||||
const DataInfo &dataInfo, uint32_t length, const MessageInfo &info);
|
||||
|
||||
bool IsSameStartedOnPeer(const struct PipeInfo &pipeInfo, const struct DeviceId &peer);
|
||||
|
||||
@ -83,7 +83,6 @@ public:
|
||||
|
||||
void OnDeviceChanged(const AppDistributedKv::DeviceInfo &info,
|
||||
const AppDistributedKv::DeviceChangeType &type) const override;
|
||||
|
||||
private:
|
||||
using Time = std::chrono::steady_clock::time_point;
|
||||
using Duration = std::chrono::steady_clock::duration;
|
||||
@ -95,6 +94,7 @@ private:
|
||||
void Reuse(const PipeInfo &pipeInfo, const DeviceId &deviceId,
|
||||
uint32_t qosType, std::shared_ptr<SoftBusClient> &conn);
|
||||
void GetExpireTime(std::shared_ptr<SoftBusClient> &conn);
|
||||
std::pair<Status, int32_t> OpenConnect(const std::shared_ptr<SoftBusClient> &conn, const DeviceId &deviceId);
|
||||
static constexpr const char *PKG_NAME = "distributeddata-default";
|
||||
static constexpr Time INVALID_NEXT = std::chrono::steady_clock::time_point::max();
|
||||
static constexpr uint32_t QOS_COUNT = 3;
|
||||
|
@ -189,8 +189,8 @@ void SoftBusAdapter::GetExpireTime(std::shared_ptr<SoftBusClient> &conn)
|
||||
}
|
||||
}
|
||||
|
||||
Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
|
||||
uint32_t length, const MessageInfo &info)
|
||||
std::pair<Status, int32_t> SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
|
||||
const DataInfo &dataInfo, uint32_t length, const MessageInfo &info)
|
||||
{
|
||||
std::shared_ptr<SoftBusClient> conn;
|
||||
bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId.deviceId);
|
||||
@ -218,29 +218,35 @@ Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &device
|
||||
Reuse(pipeInfo, deviceId, qosType, conn);
|
||||
}
|
||||
if (conn == nullptr) {
|
||||
return Status::ERROR;
|
||||
return std::make_pair(Status::ERROR, 0);
|
||||
}
|
||||
auto status = conn->CheckStatus();
|
||||
if (status == Status::RATE_LIMIT) {
|
||||
return Status::RATE_LIMIT;
|
||||
return std::make_pair(Status::RATE_LIMIT, 0);
|
||||
}
|
||||
if (status != Status::SUCCESS) {
|
||||
auto task = [this, connect = std::weak_ptr<SoftBusClient>(conn)]() {
|
||||
auto conn = connect.lock();
|
||||
if (conn != nullptr) {
|
||||
conn->OpenConnect(&clientListener_);
|
||||
}
|
||||
};
|
||||
auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId.deviceId).networkId;
|
||||
ConnectManager::GetInstance()->ApplyConnect(networkId, task);
|
||||
return Status::RATE_LIMIT;
|
||||
return OpenConnect(conn, deviceId);
|
||||
}
|
||||
|
||||
status = conn->SendData(dataInfo, &clientListener_);
|
||||
if ((status != Status::NETWORK_ERROR) && (status != Status::RATE_LIMIT)) {
|
||||
GetExpireTime(conn);
|
||||
}
|
||||
return status;
|
||||
auto errCode = conn->GetSoftBusError();
|
||||
return std::make_pair(status, errCode);
|
||||
}
|
||||
|
||||
std::pair<Status, int32_t> SoftBusAdapter::OpenConnect(const std::shared_ptr<SoftBusClient> &conn,
|
||||
const DeviceId &deviceId)
|
||||
{
|
||||
auto task = [this, connect = std::weak_ptr<SoftBusClient>(conn)]() {
|
||||
auto conn = connect.lock();
|
||||
if (conn != nullptr) {
|
||||
conn->OpenConnect(&clientListener_);
|
||||
}
|
||||
};
|
||||
auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId.deviceId).networkId;
|
||||
ConnectManager::GetInstance()->ApplyConnect(networkId, task);
|
||||
return std::make_pair(Status::RATE_LIMIT, 0);
|
||||
}
|
||||
|
||||
void SoftBusAdapter::StartCloseSessionTask(const std::string &deviceId)
|
||||
|
@ -75,12 +75,20 @@ Status SoftBusClient::SendData(const DataInfo &dataInfo, const ISocketListener *
|
||||
if (ret != SOFTBUS_OK) {
|
||||
expireTime_ = std::chrono::steady_clock::now();
|
||||
ZLOGE("send data to socket%{public}d failed, ret:%{public}d.", socket_, ret);
|
||||
softBusError_ = ret;
|
||||
return Status::ERROR;
|
||||
}
|
||||
softBusError_ = 0;
|
||||
expireTime_ = CalcExpireTime();
|
||||
return Status::SUCCESS;
|
||||
}
|
||||
|
||||
int32_t SoftBusClient::GetSoftBusError()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
return softBusError_;
|
||||
}
|
||||
|
||||
Status SoftBusClient::OpenConnect(const ISocketListener *listener)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
@ -114,10 +122,8 @@ Status SoftBusClient::OpenConnect(const ISocketListener *listener)
|
||||
}
|
||||
ZLOGI("Bind Start, device:%{public}s socket:%{public}d type:%{public}u",
|
||||
KvStoreUtils::ToBeAnonymous(client->device_.deviceId).c_str(), clientSocket, type);
|
||||
auto status = client->Open(clientSocket, QOS_INFOS[type % QOS_BUTT], listener);
|
||||
if (status == Status::SUCCESS) {
|
||||
Context::GetInstance().NotifySessionReady(client->device_.deviceId);
|
||||
}
|
||||
int32_t status = client->Open(clientSocket, QOS_INFOS[type % QOS_BUTT], listener);
|
||||
Context::GetInstance().NotifySessionReady(client->device_.deviceId, status);
|
||||
client->isOpening_.store(false);
|
||||
};
|
||||
Context::GetInstance().GetThreadPool()->Execute(task);
|
||||
@ -138,7 +144,7 @@ Status SoftBusClient::CheckStatus()
|
||||
return Status::ERROR;
|
||||
}
|
||||
|
||||
Status SoftBusClient::Open(int32_t socket, const QosTV qos[], const ISocketListener *listener)
|
||||
int32_t SoftBusClient::Open(int32_t socket, const QosTV qos[], const ISocketListener *listener)
|
||||
{
|
||||
int32_t status = ::Bind(socket, qos, QOS_COUNT, listener);
|
||||
ZLOGI("Bind %{public}s,session:%{public}s,socketId:%{public}d",
|
||||
@ -148,14 +154,15 @@ Status SoftBusClient::Open(int32_t socket, const QosTV qos[], const ISocketListe
|
||||
ZLOGE("[Bind] device:%{public}s socket failed, session:%{public}s,result:%{public}d",
|
||||
KvStoreUtils::ToBeAnonymous(device_.deviceId).c_str(), pipe_.pipeId.c_str(), status);
|
||||
::Shutdown(socket);
|
||||
return Status::NETWORK_ERROR;
|
||||
return status;
|
||||
}
|
||||
UpdateExpireTime();
|
||||
uint32_t mtu = 0;
|
||||
std::tie(status, mtu) = GetMtu(socket);
|
||||
if (status != SOFTBUS_OK) {
|
||||
ZLOGE("GetMtu failed, session:%{public}s, socket:%{public}d", pipe_.pipeId.c_str(), socket_);
|
||||
return Status::NETWORK_ERROR;
|
||||
ZLOGE("GetMtu failed, session:%{public}s, socket:%{public}d, status:%{public}d", pipe_.pipeId.c_str(), socket_,
|
||||
status);
|
||||
return status;
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
@ -166,7 +173,7 @@ Status SoftBusClient::Open(int32_t socket, const QosTV qos[], const ISocketListe
|
||||
ZLOGI("open %{public}s, session:%{public}s success, socket:%{public}d",
|
||||
KvStoreUtils::ToBeAnonymous(device_.deviceId).c_str(), pipe_.pipeId.c_str(), socket_);
|
||||
ConnectManager::GetInstance()->OnSessionOpen(DmAdapter::GetInstance().GetDeviceInfo(device_.deviceId).networkId);
|
||||
return Status::SUCCESS;
|
||||
return status;
|
||||
}
|
||||
|
||||
SoftBusClient::Time SoftBusClient::GetExpireTime() const
|
||||
|
@ -48,11 +48,12 @@ public:
|
||||
int32_t GetSocket() const;
|
||||
uint32_t GetQoSType() const;
|
||||
void UpdateExpireTime();
|
||||
int32_t GetSoftBusError();
|
||||
bool needRemove = false;
|
||||
bool isReuse = false;
|
||||
|
||||
private:
|
||||
Status Open(int32_t socket, const QosTV qos[], const ISocketListener *listener);
|
||||
int32_t Open(int32_t socket, const QosTV qos[], const ISocketListener *listener);
|
||||
std::pair<int32_t, uint32_t> GetMtu(int32_t socket);
|
||||
Time CalcExpireTime() const;
|
||||
|
||||
@ -85,6 +86,7 @@ private:
|
||||
|
||||
int32_t socket_ = INVALID_SOCKET_ID;
|
||||
int32_t bindState_ = -1;
|
||||
int32_t softBusError_ = 0;
|
||||
};
|
||||
} // namespace OHOS::AppDistributedKv
|
||||
|
||||
|
@ -133,8 +133,8 @@ HWTEST_F(CommunicationProviderImplTest, CommunicationProvider005, TestSize.Level
|
||||
const uint8_t *t = reinterpret_cast<const uint8_t*>(content.c_str());
|
||||
DeviceId di17 = {"127.0.0.2"};
|
||||
DataInfo data = { const_cast<uint8_t *>(t), static_cast<uint32_t>(content.length())};
|
||||
Status status = CommunicationProvider::GetInstance().SendData(id17, di17, data, 0);
|
||||
EXPECT_NE(status, Status::SUCCESS);
|
||||
auto status = CommunicationProvider::GetInstance().SendData(id17, di17, data, 0);
|
||||
EXPECT_NE(status.first, Status::SUCCESS);
|
||||
CommunicationProvider::GetInstance().StopWatchDataChange(dataListener17, id17);
|
||||
CommunicationProvider::GetInstance().Stop(id17);
|
||||
delete dataListener17;
|
||||
@ -234,8 +234,8 @@ HWTEST_F(CommunicationProviderImplTest, CommunicationProvider011, TestSize.Level
|
||||
const uint8_t *t = reinterpret_cast<const uint8_t*>(content.c_str());
|
||||
DeviceId di = {"DeviceId"};
|
||||
DataInfo data = { const_cast<uint8_t *>(t), static_cast<uint32_t>(content.length())};
|
||||
Status status = CommunicationProvider::GetInstance().SendData(id, di, data, 0);
|
||||
EXPECT_EQ(status, Status::ERROR);
|
||||
auto status = CommunicationProvider::GetInstance().SendData(id, di, data, 0);
|
||||
EXPECT_EQ(status.first, Status::ERROR);
|
||||
CommunicationProvider::GetInstance().StopWatchDataChange(dataListener, id);
|
||||
CommunicationProvider::GetInstance().Stop(id);
|
||||
delete dataListener;
|
||||
@ -259,8 +259,8 @@ HWTEST_F(CommunicationProviderImplTest, CommunicationProvider012, TestSize.Level
|
||||
const uint8_t *t = reinterpret_cast<const uint8_t*>(content.c_str());
|
||||
DeviceId di = {""};
|
||||
DataInfo data = { const_cast<uint8_t *>(t), static_cast<uint32_t>(content.length())};
|
||||
Status status = CommunicationProvider::GetInstance().SendData(id, di, data, 0);
|
||||
EXPECT_EQ(status, Status::ERROR);
|
||||
auto status = CommunicationProvider::GetInstance().SendData(id, di, data, 0);
|
||||
EXPECT_EQ(status.first, Status::ERROR);
|
||||
CommunicationProvider::GetInstance().StopWatchDataChange(dataListener, id);
|
||||
CommunicationProvider::GetInstance().Stop(id);
|
||||
delete dataListener;
|
||||
@ -282,8 +282,8 @@ HWTEST_F(CommunicationProviderImplTest, CommunicationProvider013, TestSize.Level
|
||||
CommunicationProvider::GetInstance().Start(id);
|
||||
DeviceId di = {"DeviceId"};
|
||||
DataInfo data = {nullptr, 0};
|
||||
Status status = CommunicationProvider::GetInstance().SendData(id, di, data, 0);
|
||||
EXPECT_EQ(status, Status::ERROR);
|
||||
auto status = CommunicationProvider::GetInstance().SendData(id, di, data, 0);
|
||||
EXPECT_EQ(status.first, Status::ERROR);
|
||||
CommunicationProvider::GetInstance().StopWatchDataChange(dataListener, id);
|
||||
CommunicationProvider::GetInstance().Stop(id);
|
||||
delete dataListener;
|
||||
@ -307,8 +307,8 @@ HWTEST_F(CommunicationProviderImplTest, CommunicationProvider014, TestSize.Level
|
||||
const uint8_t *t = reinterpret_cast<const uint8_t*>(content.c_str());
|
||||
DeviceId di = {"DeviceId"};
|
||||
DataInfo data = { const_cast<uint8_t *>(t), static_cast<uint32_t>(content.length())};
|
||||
Status status = CommunicationProvider::GetInstance().SendData(id, di, data, 0);
|
||||
EXPECT_EQ(status, Status::ERROR);
|
||||
auto status = CommunicationProvider::GetInstance().SendData(id, di, data, 0);
|
||||
EXPECT_EQ(status.first, Status::ERROR);
|
||||
CommunicationProvider::GetInstance().StopWatchDataChange(dataListener, id);
|
||||
CommunicationProvider::GetInstance().Stop(id);
|
||||
delete dataListener;
|
||||
|
@ -155,8 +155,8 @@ HWTEST_F(SoftbusAdapterStandardTest, SendData, TestSize.Level1)
|
||||
const uint8_t *t = reinterpret_cast<const uint8_t*>(content.c_str());
|
||||
DeviceId di = {"DeviceId"};
|
||||
DataInfo data = { const_cast<uint8_t *>(t), static_cast<uint32_t>(content.length())};
|
||||
Status status = SoftBusAdapter::GetInstance()->SendData(id, di, data, 11, { MessageType::DEFAULT });
|
||||
EXPECT_NE(status, Status::SUCCESS);
|
||||
auto status = SoftBusAdapter::GetInstance()->SendData(id, di, data, 11, { MessageType::DEFAULT });
|
||||
EXPECT_NE(status.first, Status::SUCCESS);
|
||||
SoftBusAdapter::GetInstance()->StopWatchDataChange(dataListener, id);
|
||||
delete dataListener;
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ public:
|
||||
{
|
||||
return ChangeLevelType::HIGH;
|
||||
}
|
||||
API_EXPORT virtual void OnSessionReady(const DeviceInfo &info) const {}
|
||||
API_EXPORT virtual void OnSessionReady(const DeviceInfo &info, int32_t errCode) const {}
|
||||
};
|
||||
} // namespace AppDistributedKv
|
||||
} // namespace OHOS
|
||||
|
@ -45,8 +45,9 @@ public:
|
||||
virtual Status StopWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo) = 0;
|
||||
|
||||
// Send data to other device, function will be called back after sent to notify send result
|
||||
virtual Status SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
|
||||
uint32_t totalLength, const MessageInfo &info = { MessageType::DEFAULT }) = 0;
|
||||
virtual std::pair<Status, int32_t> SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
|
||||
const DataInfo &dataInfo, uint32_t totalLength,
|
||||
const MessageInfo &info = { MessageType::DEFAULT }) = 0;
|
||||
|
||||
// start one server to listen data from other devices;
|
||||
virtual Status Start(const PipeInfo &pipeInfo) = 0;
|
||||
|
@ -36,7 +36,7 @@ public:
|
||||
std::shared_ptr<ExecutorPool> GetThreadPool();
|
||||
Status RegSessionListener(const DevChangeListener *observer);
|
||||
Status UnRegSessionListener(const DevChangeListener *observer);
|
||||
void NotifySessionReady(const std::string &deviceId);
|
||||
void NotifySessionReady(const std::string &deviceId, int32_t errCode);
|
||||
void NotifySessionClose(const std::string &deviceId);
|
||||
void SetSessionListener(const OnCloseAble &closeAbleCallback);
|
||||
bool IsSessionReady(const std::string &deviceId);
|
||||
|
@ -57,7 +57,7 @@ public:
|
||||
std::vector<DeviceInfos> GetRemoteOnlineDeviceInfosList() override;
|
||||
bool IsSameProcessLabelStartedOnPeerDevice(const DeviceInfos &peerDevInfo) override;
|
||||
void OnDeviceChanged(const DeviceInfo &info, const DeviceChangeType &type) const override;
|
||||
void OnSessionReady(const DeviceInfo &info) const override;
|
||||
void OnSessionReady(const DeviceInfo &info, int32_t errCode) const override;
|
||||
|
||||
API_EXPORT std::shared_ptr<DistributedDB::ExtendHeaderHandle> GetExtendHeaderHandle(
|
||||
const DistributedDB::ExtendInfo &info) override;
|
||||
|
@ -34,8 +34,9 @@ void KvStoreDeviceListener::OnDeviceChanged(
|
||||
ZLOGI("device is %{public}d", type);
|
||||
}
|
||||
|
||||
void KvStoreDeviceListener::OnSessionReady(const AppDistributedKv::DeviceInfo &info) const
|
||||
void KvStoreDeviceListener::OnSessionReady(const AppDistributedKv::DeviceInfo &info, int32_t errCode) const
|
||||
{
|
||||
(void)errCode;
|
||||
kvStoreDataService_.OnSessionReady(info);
|
||||
}
|
||||
|
||||
|
@ -26,7 +26,7 @@ public:
|
||||
void OnDeviceChanged(
|
||||
const AppDistributedKv::DeviceInfo &info, const AppDistributedKv::DeviceChangeType &type) const override;
|
||||
AppDistributedKv::ChangeLevelType GetChangeLevelType() const override;
|
||||
void OnSessionReady(const AppDistributedKv::DeviceInfo &info) const override;
|
||||
void OnSessionReady(const AppDistributedKv::DeviceInfo &info, int32_t errCode = 0) const override;
|
||||
|
||||
private:
|
||||
KvStoreDataService &kvStoreDataService_;
|
||||
|
@ -973,7 +973,7 @@ Status KVDBServiceImpl::DoSyncInOrder(
|
||||
if (uuids.empty()) {
|
||||
ZLOGW("no device seqId:0x%{public}" PRIx64 " remote:%{public}zu appId:%{public}s storeId:%{public}s",
|
||||
info.seqId, info.devices.size(), meta.bundleName.c_str(), Anonymous::Change(meta.storeId).c_str());
|
||||
return Status::ERROR;
|
||||
return Status::DEVICE_NOT_ONLINE;
|
||||
}
|
||||
if (IsNeedMetaSync(meta, uuids)) {
|
||||
auto recv = DeviceMatrix::GetInstance().GetRecvLevel(uuids[0],
|
||||
@ -1121,8 +1121,14 @@ Status KVDBServiceImpl::DoComplete(const StoreMetaData &meta, const SyncInfo &in
|
||||
SYNC_STORE_ID, Anonymous::Change(meta.storeId), SYNC_APP_ID, meta.bundleName, CONCURRENT_ID,
|
||||
std::to_string(info.syncId), DATA_TYPE, meta.dataType);
|
||||
std::map<std::string, Status> result;
|
||||
for (auto &[key, status] : dbResult) {
|
||||
result[key] = ConvertDbStatus(status);
|
||||
if (AccessTokenKit::GetTokenTypeFlag(meta.tokenId) != TOKEN_HAP) {
|
||||
for (auto &[key, status] : dbResult) {
|
||||
result[key] = ConvertDbStatusNative(status);
|
||||
}
|
||||
} else {
|
||||
for (auto &[key, status] : dbResult) {
|
||||
result[key] = ConvertDbStatus(status);
|
||||
}
|
||||
}
|
||||
for (const auto &device : info.devices) {
|
||||
auto it = result.find(device);
|
||||
@ -1145,6 +1151,18 @@ Status KVDBServiceImpl::DoComplete(const StoreMetaData &meta, const SyncInfo &in
|
||||
return SUCCESS;
|
||||
}
|
||||
|
||||
Status KVDBServiceImpl::ConvertDbStatusNative(DBStatus status)
|
||||
{
|
||||
auto innerStatus = static_cast<int32_t>(status);
|
||||
if (innerStatus < 0) {
|
||||
return static_cast<Status>(status);
|
||||
} else if (status == DBStatus::COMM_FAILURE) {
|
||||
return Status::DEVICE_NOT_ONLINE;
|
||||
} else {
|
||||
return ConvertDbStatus(status);
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t KVDBServiceImpl::GetSyncDelayTime(uint32_t delay, const StoreId &storeId)
|
||||
{
|
||||
if (delay != 0) {
|
||||
|
@ -150,6 +150,7 @@ private:
|
||||
void TryToSync(const StoreMetaData &metaData, bool force = false);
|
||||
bool IsRemoteChange(const StoreMetaData &metaData, const std::string &device);
|
||||
bool IsOHOSType(const std::vector<std::string> &ids);
|
||||
Status ConvertDbStatusNative(DBStatus status);
|
||||
static Factory factory_;
|
||||
ConcurrentMap<uint32_t, SyncAgent> syncAgents_;
|
||||
std::shared_ptr<ExecutorPool> executors_;
|
||||
|
Loading…
Reference in New Issue
Block a user