!1406 【分布式DB】ratelimit场景下透传软总线错误码

Merge pull request !1406 from suyue/dts
This commit is contained in:
openharmony_ci 2024-11-13 08:14:53 +00:00 committed by Gitee
commit 6b0ec0ca64
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
24 changed files with 293 additions and 114 deletions

View File

@ -25,7 +25,7 @@
namespace DistributedDB {
using LabelType = std::vector<uint8_t>;
using Finalizer = std::function<void(void)>;
using OnSendEnd = std::function<void(int result)>;
using OnSendEnd = std::function<void(int result, bool isDirectEnd)>;
using OnConnectCallback = std::function<void(const std::string &target, bool isConnect)>;
constexpr unsigned int COMM_LABEL_LENGTH = 32; // Using SHA256 which length is 32
constexpr uint32_t MAX_TOTAL_LEN = 104857600; // 100M Limitation For Max Total Length

View File

@ -28,7 +28,7 @@ namespace DistributedDB {
using BytesReceiveCallback = std::function<void(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
const std::string &userId)>;
using TargetChangeCallback = std::function<void(const std::string &target, bool isConnect)>;
using SendableCallback = std::function<void(const std::string &target)>;
using SendableCallback = std::function<void(const std::string &target, int softBusErrCode)>;
class IAdapter {
public:

View File

@ -78,6 +78,7 @@ public:
uint32_t GetNoDelayTaskCount() const;
void InvalidSendTask(const std::string &target);
void SetSoftBusErrCode(const std::string &target, int softBusErrCode);
private:
int ScheduleDelayTask(SendTask &outTask, SendTaskInfo &outTaskInfo);
@ -101,6 +102,8 @@ private:
bool scheduledFlag_ = false;
std::string lastScheduleTarget_;
Priority lastSchedulePriority_ = Priority::LOW;
std::map<std::string, int> softBusErrCodeMap_;
};
}

View File

@ -281,7 +281,7 @@ void DoOnSendEndByTaskIfNeed(const OnSendEnd &onEnd, int result)
if (onEnd) { // LCOV_EXCL_BR_LINE
TaskAction onSendEndTask = [onEnd, result]() {
LOGD("[CommAggr][SendEndTask] Before On Send End.");
onEnd(result);
onEnd(result, true);
LOGD("[CommAggr][SendEndTask] After On Send End.");
};
int errCode = RuntimeContext::GetInstance()->ScheduleTask(onSendEndTask);
@ -452,7 +452,7 @@ void CommunicatorAggregator::TaskFinalizer(const SendTask &inTask, int result)
// Call the OnSendEnd if need
if (inTask.onEnd) {
LOGD("[CommAggr][TaskFinal] On Send End.");
inTask.onEnd(result);
inTask.onEnd(result, true);
}
// Finalize the task that just scheduled
int errCode = scheduler_.FinalizeLastScheduleTask();
@ -738,10 +738,13 @@ int CommunicatorAggregator::RegCallbackToAdapter()
}
RefObject::IncObjRef(this); // Reference to be hold by adapter
errCode = adapterHandle_->RegSendableCallback([this](const std::string &target) {
LOGI("[CommAggr] Send able dev=%.3s", target.c_str());
(void)IncreaseSendSequenceId(target);
OnSendable(target);
errCode = adapterHandle_->RegSendableCallback([this](const std::string &target, int softBusErrCode) {
LOGI("[CommAggr] Send able dev=%.3s, softBusErrCode=%d", target.c_str(), softBusErrCode);
if (softBusErrCode == E_OK) {
(void)IncreaseSendSequenceId(target);
OnSendable(target);
}
scheduler_.SetSoftBusErrCode(target, softBusErrCode);
},
[this]() { RefObject::DecObjRef(this); });
if (errCode != E_OK) {

View File

@ -84,9 +84,9 @@ int NetworkAdapter::StartAdapter()
LOGI("[NAdapt][Start] ROLLBACK: Stop errCode=%d.", static_cast<int>(errCode));
return -E_PERIPHERAL_INTERFACE_FAIL;
}
processCommunicator_->RegOnSendAble([this](const DeviceInfos &devInfo) {
processCommunicator_->RegOnSendAble([this](const DeviceInfos &devInfo, int softBusErrCode) {
if (onSendableHandle_ != nullptr) {
onSendableHandle_(devInfo.identifier);
onSendableHandle_(devInfo.identifier, softBusErrCode);
}
});
// These code is compensation for the probable defect of IProcessCommunicator implementation.

View File

@ -300,7 +300,22 @@ void SendTaskScheduler::InvalidSendTask(const std::string &target)
for (auto &sendTask : taskGroupByPrio_[priority][target]) {
sendTask.isValid = false;
LOGI("[Scheduler][InvalidSendTask] invalid frameId=%" PRIu32, sendTask.frameId);
if ((softBusErrCodeMap_.count(target) == 0) || (softBusErrCodeMap_[target] == E_OK)) {
continue;
}
LOGE("[Scheduler][InvalidSendTask] target=%.3s, errCode=%d", target.c_str(), softBusErrCodeMap_[target]);
if (sendTask.onEnd) {
LOGI("[Scheduler][InvalidSendTask] On Send End.");
sendTask.onEnd(softBusErrCodeMap_[target], false);
sendTask.onEnd = nullptr;
}
}
}
softBusErrCodeMap_.erase(target);
}
void SendTaskScheduler::SetSoftBusErrCode(const std::string &target, int softBusErrCode)
{
softBusErrCodeMap_[target] = softBusErrCode;
}
}

View File

@ -67,7 +67,7 @@ using OnDeviceChange = std::function<void(const DeviceInfos &devInfo, bool isOnl
// In OnDataReceive, all field of srcDevInfo should be valid
using OnDataReceive = std::function<void(const DeviceInfos &srcDevInfo, const uint8_t *data, uint32_t length)>;
using OnSendAble = std::function<void(const DeviceInfos &deviceInfo)>;
using OnSendAble = std::function<void(const DeviceInfos &deviceInfo, int softBusErrCode)>;
// For all functions with returnType DBStatus:
// return DBStatus::OK if successful, otherwise DBStatus::DB_ERROR if anything wrong.

View File

@ -26,7 +26,7 @@
#include "sync_target.h"
#include "time_helper.h"
namespace DistributedDB {
using CommErrHandler = std::function<void(int)>;
using CommErrHandler = std::function<void(int, bool)>;
class ISyncTaskContext : public virtual RefObject {
public:

View File

@ -519,7 +519,8 @@ int RemoteExecutor::SendRequestMessage(const std::string &target, Message *messa
SendConfig sendConfig;
SetSendConfigParam(syncInterface->GetDbProperties(), target, false, REMOTE_EXECUTOR_SEND_TIME_OUT, sendConfig);
RefObject::IncObjRef(this);
int errCode = communicator->SendMessage(target, message, sendConfig, [this, sessionId](int errCode) {
int errCode = communicator->SendMessage(target, message, sendConfig,
[this, sessionId](int errCode, bool isDirectEnd) {
if (errCode != E_OK) {
DoSendFailed(sessionId, errCode);
}

View File

@ -1066,8 +1066,8 @@ int SingleVerDataSync::SendDataPacket(SyncType syncType, DataRequestPacket *pack
if (performance != nullptr) {
performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
}
CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret) {
SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId);
CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
};
errCode = Send(context, message, handler, packetLen);
if (errCode != E_OK) {
@ -1467,8 +1467,8 @@ int SingleVerDataSync::SendReSendPacket(DataRequestPacket *packet, SingleVerSync
return errCode;
}
SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(), sequenceId, sessionId);
CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret) {
SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId);
CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
};
errCode = Send(context, message, handler, packetLen);
if (errCode != E_OK) {

View File

@ -111,8 +111,8 @@ int SingleVerDataSync::SendControlPacket(ControlRequestPacket *packet, SingleVer
}
SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
context->GetSequenceId(), context->GetRequestSessionId());
CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret) {
SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId);
CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
};
errCode = Send(context, message, handler, packetLen);
if (errCode != E_OK) {

View File

@ -477,8 +477,9 @@ Event SingleVerSyncStateMachine::DoTimeSync() const
CommErrHandler handler = nullptr;
// Auto sync need do retry don't use errHandler to return.
if (!context_->IsAutoSync()) {
handler = [this, context = context_, requestSessionId = context_->GetRequestSessionId()](int ret) {
SyncTaskContext::CommErrHandlerFunc(ret, context, requestSessionId);
handler = [this, context = context_,
requestSessionId = context_->GetRequestSessionId()](int ret, bool isDirectEnd) {
SyncTaskContext::CommErrHandlerFunc(ret, context, requestSessionId, isDirectEnd);
};
}
int errCode = timeSync_->SyncStart(handler, context_->GetRequestSessionId());
@ -509,8 +510,9 @@ Event SingleVerSyncStateMachine::DoAbilitySync() const
return GetEventAfterTimeSync(context_->GetMode());
}
CommErrHandler handler = [this, context = context_, requestSessionId = context_->GetRequestSessionId()](int ret) {
SyncTaskContext::CommErrHandlerFunc(ret, context, requestSessionId);
CommErrHandler handler = [this, context = context_,
requestSessionId = context_->GetRequestSessionId()](int ret, bool isDirectEnd) {
SyncTaskContext::CommErrHandlerFunc(ret, context, requestSessionId, isDirectEnd);
};
LOGI("[StateMachine][AbilitySync] start abilitySync,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
STR_MASK(context_->GetDeviceId()));

View File

@ -484,7 +484,7 @@ void SyncTaskContext::Abort(int status)
Clear();
}
void SyncTaskContext::CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId)
void SyncTaskContext::CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId, bool isDirectEnd)
{
{
std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
@ -496,7 +496,8 @@ void SyncTaskContext::CommErrHandlerFunc(int errCode, ISyncTaskContext *context,
RefObject::IncObjRef(context);
}
static_cast<SyncTaskContext *>(context)->CommErrHandlerFuncInner(errCode, static_cast<uint32_t>(sessionId));
static_cast<SyncTaskContext *>(context)->CommErrHandlerFuncInner(errCode, static_cast<uint32_t>(sessionId),
isDirectEnd);
RefObject::DecObjRef(context);
}
@ -527,7 +528,7 @@ bool SyncTaskContext::IsCommNormal() const
return isCommNormal_;
}
void SyncTaskContext::CommErrHandlerFuncInner(int errCode, uint32_t sessionId)
void SyncTaskContext::CommErrHandlerFuncInner(int errCode, uint32_t sessionId, bool isDirectEnd)
{
{
RefObject::AutoLock lock(this);
@ -536,12 +537,17 @@ void SyncTaskContext::CommErrHandlerFuncInner(int errCode, uint32_t sessionId)
}
if (errCode == E_OK) {
SetCommFailErrCode(errCode);
// when communicator sent message failed, the state machine will get the error and exit this sync task
// it seems unnecessary to change isCommNormal_ value, so just return here
return;
}
}
LOGE("[SyncTaskContext][CommErr] errCode %d", errCode);
LOGE("[SyncTaskContext][CommErr] errCode %d, isDirectEnd %d", errCode, static_cast<int>(isDirectEnd));
if (!isDirectEnd) {
SetErrCodeWhenWaitTimeOut(errCode);
return;
}
if (errCode > 0) {
SetCommFailErrCode(static_cast<int>(COMM_FAILURE));
} else {
@ -857,4 +863,13 @@ void SyncTaskContext::SetCommFailErrCode(int errCode)
{
commErrCode_ = errCode;
}
void SyncTaskContext::SetErrCodeWhenWaitTimeOut(int errCode)
{
if (errCode > 0) {
SetCommFailErrCode(static_cast<int>(TIME_OUT));
} else {
SetCommFailErrCode(errCode);
}
}
} // namespace DistributedDB

View File

@ -174,7 +174,7 @@ public:
virtual void Abort(int status);
// Used in send msg, as execution is asynchronous, should use this function to handle result.
static void CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId);
static void CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId, bool isDirectEnd = true);
int GetTaskErrCode() const override;
@ -229,7 +229,7 @@ protected:
virtual void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam);
void CommErrHandlerFuncInner(int errCode, uint32_t sessionId);
void CommErrHandlerFuncInner(int errCode, uint32_t sessionId, bool isDirectEnd = true);
void KillWait();
@ -247,6 +247,8 @@ protected:
static uint8_t GetPermissionCheckFlag(bool isAutoSync, int syncMode);
void SetErrCodeWhenWaitTimeOut(int errCode);
mutable std::mutex targetQueueLock_;
std::list<ISyncTarget *> requestTargetQueue_;
std::list<ISyncTarget *> responseTargetQueue_;

View File

@ -502,7 +502,7 @@ int TimeSync::TimeSyncDriver(TimerId timerId)
}
std::lock_guard<std::mutex> lock(timeDriverLock_);
int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
CommErrHandler handler = [this](int ret) { CommErrHandlerFunc(ret, this); };
CommErrHandler handler = [this](int ret, bool isDirectEnd) { CommErrHandlerFunc(ret, this); };
(void)this->SyncStart(handler);
std::lock_guard<std::mutex> innerLock(this->timeDriverLock_);
this->timeDriverLockCount_--;
@ -523,7 +523,7 @@ int TimeSync::GetTimeOffset(TimeOffset &outOffset, uint32_t timeout, uint32_t se
std::lock_guard<std::mutex> lock(cvLock_);
isAckReceived_ = false;
}
CommErrHandler handler = [this](int ret) { CommErrHandlerFunc(ret, this); };
CommErrHandler handler = [this](int ret, bool isDirectEnd) { CommErrHandlerFunc(ret, this); };
int errCode = SyncStart(handler, sessionId);
LOGD("TimeSync::GetTimeOffset start, current time = %" PRIu64 ", errCode = %d, timeout = %" PRIu32 " ms",
TimeHelper::GetSysCurrentTime(), errCode, timeout);
@ -621,7 +621,7 @@ int TimeSync::SendMessageWithSendEnd(const Message *message, const CommErrHandle
{
std::shared_ptr<TimeSync> timeSyncPtr = shared_from_this();
auto sessionId = message->GetSessionId();
return SendPacket(deviceId_, message, [handler, timeSyncPtr, sessionId, this](int errCode) {
return SendPacket(deviceId_, message, [handler, timeSyncPtr, sessionId, this](int errCode, bool isDirectEnd) {
if (closed_) {
LOGW("[TimeSync] DB closed, ignore send end! dev=%.3s", deviceId_.c_str());
return;
@ -632,7 +632,7 @@ int TimeSync::SendMessageWithSendEnd(const Message *message, const CommErrHandle
sessionBeginTime_[sessionId] = timeHelper_->GetTime();
}
if (handler != nullptr) {
handler(errCode);
handler(errCode, isDirectEnd);
}
});
}

View File

@ -114,7 +114,7 @@ void SyncOperation::SetStatus(const std::string &deviceId, int status, int commE
return;
}
iter->second = status;
if ((status != OP_COMM_ABNORMAL) || (commErrCode == E_OK)) {
if (((status != OP_COMM_ABNORMAL) && (status != OP_TIMEOUT)) || (commErrCode == E_OK)) {
return;
}
commErrCodeMap_.insert(std::pair<std::string, int>(deviceId, commErrCode));
@ -164,7 +164,7 @@ int SyncOperation::GetMode() const
void SyncOperation::ReplaceCommErrCode(std::map<std::string, int> &finishStatus)
{
for (auto &item : finishStatus) {
if (item.second != OP_COMM_ABNORMAL) {
if ((item.second != OP_COMM_ABNORMAL) && (item.second != OP_TIMEOUT)) {
continue;
}
std::string deviceId = item.first;
@ -476,7 +476,7 @@ std::string SyncOperation::GetFinishDetailMsg(const std::map<std::string, int> &
std::string msg = "Sync detail is:";
for (const auto &[dev, status]: finishStatus) {
msg += "dev=" + DBCommon::StringMasking(dev);
if (status > static_cast<int>(OP_FINISHED_ALL)) {
if ((status > static_cast<int>(OP_FINISHED_ALL)) || (status < E_OK)) {
msg += " sync failed, reason is " + std::to_string(status);
} else {
msg += " sync success";

View File

@ -254,7 +254,7 @@ void AdapterStub::SimulateSendRetry(const std::string &dstTarget)
targetRetrySet_.insert(dstTarget);
}
void AdapterStub::SimulateSendRetryClear(const std::string &dstTarget)
void AdapterStub::SimulateSendRetryClear(const std::string &dstTarget, int softBusErrCode)
{
{
std::lock_guard<std::mutex> retryLockGuard(retryMutex_);
@ -265,7 +265,7 @@ void AdapterStub::SimulateSendRetryClear(const std::string &dstTarget)
}
std::lock_guard<std::mutex> onSendableLockGuard(onSendableMutex_);
if (onSendableHandle_) {
onSendableHandle_(dstTarget);
onSendableHandle_(dstTarget, softBusErrCode);
}
}

View File

@ -67,7 +67,7 @@ public:
void SimulateSendBlockClear();
void SimulateSendRetry(const std::string &dstTarget);
void SimulateSendRetryClear(const std::string &dstTarget);
void SimulateSendRetryClear(const std::string &dstTarget, int softBusErrCode = E_OK);
void SimulateSendPartialLoss();
void SimulateSendPartialLossClear();

View File

@ -964,4 +964,54 @@ HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter007, TestSize.Level1)
DeviceInfos deviceInfos;
onDataReceive(deviceInfos, data.data(), 1u);
EXPECT_EQ(callByteReceiveCount, 0);
}
/**
* @tc.name: RetrySendExceededLimit001
* @tc.desc: Test send result when the number of retry times exceeds the limit
* @tc.type: FUNC
* @tc.require:
* @tc.author: suyue
*/
HWTEST_F(DistributedDBCommunicatorDeepTest, RetrySendExceededLimit001, TestSize.Level2)
{
/**
* @tc.steps: step1. connect device A with device B and fork SendBytes
* @tc.expected: step1. operation OK
*/
AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
std::atomic<int> count = 0;
g_envDeviceA.adapterHandle->ForkSendBytes([&count]() {
count++;
return -E_WAIT_RETRY;
});
/**
* @tc.steps: step2. the number of retry times for device A to send a message exceeds the limit
* @tc.expected: step2. sendResult fail
*/
std::vector<std::pair<int, bool>> sendResult;
auto sendResultNotifier = [&sendResult](int result, bool isDirectEnd) {
sendResult.push_back(std::pair<int, bool>(result, isDirectEnd));
};
const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
Message *sendMsg = BuildRegedGiantMessage(dataLength);
ASSERT_NE(sendMsg, nullptr);
SendConfig conf = {false, false, 0};
int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsg, conf, sendResultNotifier);
EXPECT_EQ(errCode, E_OK);
std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait 1s to make sure send done
g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B, -E_BASE);
int reTryTimes = 5;
while ((count < 4) && (reTryTimes > 0)) { // Wait to make sure retry exceeds the limit
std::this_thread::sleep_for(std::chrono::seconds(3));
reTryTimes--;
}
ASSERT_EQ(sendResult.size(), static_cast<size_t>(1)); // only one callback result notification
EXPECT_EQ(sendResult[0].first, -E_BASE); // index 0 retry fail
EXPECT_EQ(sendResult[0].second, false);
g_envDeviceA.adapterHandle->ForkSendBytes(nullptr);
AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
}

View File

@ -578,7 +578,7 @@ HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendResultNotify001, TestSize
{
// preset
std::vector<int> sendResult;
auto sendResultNotifier = [&sendResult](int result) {
auto sendResultNotifier = [&sendResult](int result, bool isDirectEnd) {
sendResult.push_back(result);
};

View File

@ -827,77 +827,6 @@ HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync002, TestSiz
EXPECT_EQ(value5, value4);
}
/**
* @tc.name: Device Offline Sync 003
* @tc.desc: Test sync statuses when device offline and sendMessage return different errCode
* @tc.type: FUNC
* @tc.require:
* @tc.author: suyue
*/
HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync003, TestSize.Level1)
{
/**
* @tc.steps: step1. device put data.
* @tc.expected: step1. sync return OK.
*/
std::vector<std::string> devices;
devices.push_back(g_deviceB->GetDeviceId());
devices.push_back(g_deviceC->GetDeviceId());
Key key1 = {'1'};
Value value1 = {'1'};
ASSERT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
/**
* @tc.steps: step2. call sync when device offline and mock commErrCode is E_BASE(positive number).
* @tc.expected: step2. return COMM_FAILURE.
*/
g_communicatorAggregator->MockCommErrCode(E_BASE);
std::map<std::string, DBStatus> result;
DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
ASSERT_EQ(status, OK);
for (const auto &pair : result) {
LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, E_BASE);
EXPECT_EQ(pair.second, COMM_FAILURE);
}
/**
* @tc.steps: step3. call sync when device offline and mock commErrCode is -E_BASE(negative number).
* @tc.expected: step3. return -E_BASE.
*/
g_communicatorAggregator->MockCommErrCode(-E_BASE);
status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
ASSERT_EQ(status, OK);
for (const auto &pair : result) {
LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, COMM_FAILURE);
EXPECT_EQ(pair.second, static_cast<DBStatus>(-E_BASE));
}
/**
* @tc.steps: step4. call sync when device offline and mock commErrCode is INT_MAX.
* @tc.expected: step4. return COMM_FAILURE.
*/
g_communicatorAggregator->MockCommErrCode(INT_MAX);
status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
ASSERT_EQ(status, OK);
for (const auto &pair : result) {
LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, INT_MAX);
EXPECT_EQ(pair.second, COMM_FAILURE);
}
/**
* @tc.steps: step5. call sync when device offline and mock commErrCode is -INT_MAX.
* @tc.expected: step5. return -INT_MAX.
*/
g_communicatorAggregator->MockCommErrCode(-INT_MAX);
status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
ASSERT_EQ(status, OK);
for (const auto &pair : result) {
LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, -INT_MAX);
EXPECT_EQ(pair.second, -INT_MAX);
}
g_communicatorAggregator->MockCommErrCode(E_OK);
}
/**
* @tc.name: EncryptedAlgoUpgrade001
* @tc.desc: Test upgrade encrypted db can sync normally
@ -2280,3 +2209,149 @@ HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, MetaBusy001, TestSize.Level1)
g_deviceB->SetSaveDataCallback(nullptr);
RuntimeContext::GetInstance()->StopTaskPool();
}
/**
* @tc.name: TestErrCodePassthrough001
* @tc.desc: Test ErrCode Passthrough when sync comm fail
* @tc.type: FUNC
* @tc.require:
* @tc.author: suyue
*/
HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, TestErrCodePassthrough001, TestSize.Level1)
{
/**
* @tc.steps: step1. device put data.
* @tc.expected: step1. sync return OK.
*/
std::vector<std::string> devices;
devices.push_back(g_deviceB->GetDeviceId());
devices.push_back(g_deviceC->GetDeviceId());
Key key1 = {'1'};
Value value1 = {'1'};
ASSERT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
/**
* @tc.steps: step2. call sync and mock commErrCode is E_BASE(positive number).
* @tc.expected: step2. return COMM_FAILURE.
*/
g_communicatorAggregator->MockCommErrCode(E_BASE);
std::map<std::string, DBStatus> result;
DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
ASSERT_EQ(status, OK);
for (const auto &pair : result) {
LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, E_BASE);
EXPECT_EQ(pair.second, COMM_FAILURE);
}
/**
* @tc.steps: step3. call sync and mock commErrCode is -E_BASE(negative number).
* @tc.expected: step3. return -E_BASE.
*/
g_communicatorAggregator->MockCommErrCode(-E_BASE);
status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
ASSERT_EQ(status, OK);
for (const auto &pair : result) {
LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, COMM_FAILURE);
EXPECT_EQ(pair.second, static_cast<DBStatus>(-E_BASE));
}
/**
* @tc.steps: step4. call sync and mock commErrCode is INT_MAX.
* @tc.expected: step4. return COMM_FAILURE.
*/
g_communicatorAggregator->MockCommErrCode(INT_MAX);
status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
ASSERT_EQ(status, OK);
for (const auto &pair : result) {
LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, INT_MAX);
EXPECT_EQ(pair.second, COMM_FAILURE);
}
/**
* @tc.steps: step5. call sync and mock commErrCode is -INT_MAX.
* @tc.expected: step5. return -INT_MAX.
*/
g_communicatorAggregator->MockCommErrCode(-INT_MAX);
status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
ASSERT_EQ(status, OK);
for (const auto &pair : result) {
LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, -INT_MAX);
EXPECT_EQ(pair.second, -INT_MAX);
}
g_communicatorAggregator->MockCommErrCode(E_OK);
}
/**
* @tc.name: TestErrCodePassthrough002
* @tc.desc: Test ErrCode Passthrough when sync time out and isDirectEnd is false
* @tc.type: FUNC
* @tc.require:
* @tc.author: suyue
*/
HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, TestErrCodePassthrough002, TestSize.Level3)
{
/**
* @tc.steps: step1. device put data.
* @tc.expected: step1. sync return OK.
*/
std::vector<std::string> devices;
devices.push_back(g_deviceB->GetDeviceId());
ASSERT_EQ(g_kvDelegatePtr->Put({'k', '1'}, {'v', '1'}), OK);
/**
* @tc.steps: step2. set messageId invalid and isDirectEnd is false
* @tc.expected: step2. make sure deviceA push data failed due to timeout
*/
g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
ASSERT_NE(msg, nullptr);
if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
msg->SetMessageId(INVALID_MESSAGE_ID);
}
});
g_communicatorAggregator->MockDirectEndFlag(false);
/**
* @tc.steps: step3. call sync and mock errCode is E_BASE(positive number).
* @tc.expected: step3. return TIME_OUT.
*/
std::map<std::string, DBStatus> result;
auto callback = [&result](const std::map<std::string, DBStatus> &map) {
result = map;
};
Query query = Query::Select().PrefixKey({'k', '1'});
g_communicatorAggregator->MockCommErrCode(E_BASE);
EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
EXPECT_EQ(result.size(), devices.size());
EXPECT_EQ(result[DEVICE_B], TIME_OUT);
/**
* @tc.steps: step4. call sync and mock errCode is -E_BASE(negative number).
* @tc.expected: step4. return -E_BASE.
*/
g_communicatorAggregator->MockCommErrCode(-E_BASE);
EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
EXPECT_EQ(result.size(), devices.size());
EXPECT_EQ(result[DEVICE_B], -E_BASE);
/**
* @tc.steps: step5. call sync and mock errCode is E_OK(0).
* @tc.expected: step5. return TIME_OUT.
*/
g_communicatorAggregator->MockCommErrCode(E_OK);
EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
EXPECT_EQ(result.size(), devices.size());
EXPECT_EQ(result[DEVICE_B], TIME_OUT);
/**
* @tc.steps: step6. call sync and mock errCode is -INT_MAX.
* @tc.expected: step6. return - INT_MAX.
*/
g_communicatorAggregator->MockCommErrCode(-INT_MAX);
EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
EXPECT_EQ(result.size(), devices.size());
EXPECT_EQ(result[DEVICE_B], -INT_MAX);
g_communicatorAggregator->RegOnDispatch(nullptr);
g_communicatorAggregator->MockCommErrCode(E_OK);
g_communicatorAggregator->MockDirectEndFlag(true);
}

View File

@ -259,8 +259,8 @@ void VirtualCommunicatorAggregator::CallSendEnd(int errCode, const OnSendEnd &on
errCode = commErrCodeMock_;
}
if (onEnd) {
(void)RuntimeContext::GetInstance()->ScheduleTask([errCode, onEnd]() {
onEnd(errCode);
(void)RuntimeContext::GetInstance()->ScheduleTask([errCode, onEnd, this]() {
onEnd(errCode, isDirectEnd_);
});
}
}
@ -380,4 +380,9 @@ void VirtualCommunicatorAggregator::MockCommErrCode(int mockErrCode)
std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
commErrCodeMock_ = mockErrCode;
}
void VirtualCommunicatorAggregator::MockDirectEndFlag(bool isDirectEnd)
{
isDirectEnd_ = isDirectEnd;
}
} // namespace DistributedDB

View File

@ -95,6 +95,8 @@ public:
void MockCommErrCode(int mockErrCode);
void MockDirectEndFlag(bool isDirectEnd);
~VirtualCommunicatorAggregator() override = default;
VirtualCommunicatorAggregator() = default;
@ -127,6 +129,7 @@ private:
std::string localDeviceId_;
int getLocalDeviceRet_ = E_OK;
int commErrCodeMock_ = E_OK;
bool isDirectEnd_ = true;
};
} // namespace DistributedDB

View File

@ -208,7 +208,12 @@ enum Status : int32_t {
/**
* database can not open.
*/
DB_CANT_OPEN = DISTRIBUTEDDATAMGR_ERR_OFFSET + 42
DB_CANT_OPEN = DISTRIBUTEDDATAMGR_ERR_OFFSET + 42,
/**
* The device is not online.
*/
DEVICE_NOT_ONLINE = DISTRIBUTEDDATAMGR_ERR_OFFSET + 43
};
} // namespace OHOS::DistributedKv
#endif // OHOS_DISTRIBUTED_DATA_INTERFACES_DISTRIBUTEDDATA_STORE_ERRNO_H