!2299 端到端打点

Merge pull request !2299 from wTong888/master
This commit is contained in:
openharmony_ci 2024-09-26 02:38:53 +00:00 committed by Gitee
commit 9ca5294ddd
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
21 changed files with 385 additions and 101 deletions

View File

@ -117,6 +117,7 @@
"cloud/cloud_db.h",
"cloud/cloud_event.h",
"cloud/cloud_info.h",
"cloud/cloud_report.h",
"cloud/cloud_server.h",
"cloud/schema_meta.h",
"cloud/subscription.h",

View File

@ -60,6 +60,7 @@ ohos_shared_library("distributeddatasvcfwk") {
"cloud/cloud_extra_data.cpp",
"cloud/cloud_info.cpp",
"cloud/cloud_lock_event.cpp",
"cloud/cloud_report.cpp",
"cloud/cloud_server.cpp",
"cloud/cloud_share_event.cpp",
"cloud/cloud_sync_finished_event.cpp",

View File

@ -99,4 +99,10 @@ std::pair<int32_t, std::string> CloudDB::GetEmptyCursor(const std::string &table
{
return { E_NOT_SUPPORT, "" };
}
void CloudDB::SetPrepareTraceId(const std::string &prepareTraceId)
{
(void)prepareTraceId;
return;
}
} // namespace OHOS::DistributedData

View File

@ -12,11 +12,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cloud/cloud_extra_data.h"
#include "cloud/cloud_config_manager.h"
namespace OHOS::DistributedData {
bool Context::Marshal(Serializable::json &node) const
{
SetValue(node[GET_NAME(traceId)], traceId);
SetValue(node[GET_NAME(prepareTraceId)], prepareTraceId);
return true;
}
bool Context::Unmarshal(const Serializable::json &node)
{
GetValue(node, GET_NAME(traceId), traceId);
GetValue(node, GET_NAME(prepareTraceId), prepareTraceId);
return true;
}
bool ExtensionInfo::Marshal(Serializable::json &node) const
{
SetValue(node[GET_NAME(accountId)], accountId);
@ -38,7 +51,15 @@ bool ExtensionInfo::Unmarshal(const Serializable::json &node)
return false;
}
GetValue(node, GET_NAME(recordTypes), recordTypes);
return Unmarshall(recordTypes, tables);
if (!Unmarshall(recordTypes, tables)) {
return false;
}
std::string data;
GetValue(node, GET_NAME(context), data);
if (data.empty()) {
return true;
}
return context.Unmarshall(data);
}
bool ExtraData::Marshal(Serializable::json &node) const

View File

@ -0,0 +1,49 @@
/*
* Copyright (c) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cloud/cloud_report.h"
namespace OHOS::DistributedData {
CloudReport *CloudReport::instance_ = nullptr;
CloudReport *CloudReport::GetInstance()
{
return instance_;
}
bool CloudReport::RegisterCloudReportInstance(CloudReport *instance)
{
if (instance_ != nullptr) {
return false;
}
instance_ = instance;
return true;
}
std::string CloudReport::GetPrepareTraceId(int32_t userId, const std::string &bundleName)
{
return "";
}
std::string CloudReport::GetRequestTraceId(int32_t userId, const std::string &bundleName)
{
return "";
}
bool CloudReport::Report(const ReportParam &reportParam)
{
return true;
}
} // namespace OHOS::DistributedData

View File

@ -20,9 +20,11 @@ SyncEvent::EventInfo::EventInfo(int32_t mode, int32_t wait, bool retry, std::sha
: retry_(retry), mode_(mode), wait_(wait), query_(std::move(query)), asyncDetail_(std::move(async))
{
}
SyncEvent::EventInfo::EventInfo(const SyncParam &syncParam, bool retry, std::shared_ptr<GenQuery> query, GenAsync async)
: retry_(retry), mode_(syncParam.mode), wait_(syncParam.wait), query_(std::move(query)),
asyncDetail_(std::move(async)), isCompensation_(syncParam.isCompensation), triggerMode_(syncParam.triggerMode)
asyncDetail_(std::move(async)), isCompensation_(syncParam.isCompensation), triggerMode_(syncParam.triggerMode),
prepareTraceId_(syncParam.prepareTraceId), user_(syncParam.user)
{
}
@ -43,6 +45,8 @@ SyncEvent::EventInfo &SyncEvent::EventInfo::operator=(SyncEvent::EventInfo &&inf
asyncDetail_ = std::move(info.asyncDetail_);
isCompensation_ = info.isCompensation_;
triggerMode_ = info.triggerMode_;
prepareTraceId_ = info.prepareTraceId_;
user_ = info.user_;
return *this;
}
@ -90,4 +94,14 @@ int32_t SyncEvent::GetTriggerMode() const
{
return info_.triggerMode_;
}
std::string SyncEvent::GetPrepareTraceId() const
{
return info_.prepareTraceId_;
}
int32_t SyncEvent::GetUser() const
{
return info_.user_;
}
} // namespace OHOS::DistributedData

View File

@ -63,6 +63,8 @@ public:
virtual int32_t Close();
virtual std::pair<int32_t, std::string> GetEmptyCursor(const std::string &tableName);
virtual void SetPrepareTraceId(const std::string &prepareTraceId);
};
} // namespace OHOS::DistributedData
#endif // OHOS_DISTRIBUTED_DATA_SERVICES_FRAMEWORK_CLOUD_CLOUD_DB_H

View File

@ -20,6 +20,14 @@
#include "serializable/serializable.h"
namespace OHOS::DistributedData {
class API_EXPORT Context final : public Serializable {
public:
std::string traceId;
std::string prepareTraceId;
bool Marshal(json &node) const override;
bool Unmarshal(const json &node) override;
};
class API_EXPORT ExtensionInfo final : public Serializable {
public:
std::string accountId;
@ -29,6 +37,7 @@ public:
std::vector<std::string> scopes;
std::string recordTypes;
std::vector<std::string> tables;
Context context;
bool Marshal(json &node) const override;
bool Unmarshal(const json &node) override;
};

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef OHOS_DISTRIBUTED_DATA_SERVICES_FRAMEWORK_CLOUD_CLOUD_REPORT_H
#define OHOS_DISTRIBUTED_DATA_SERVICES_FRAMEWORK_CLOUD_CLOUD_REPORT_H
#include "store/general_value.h"
#include "visibility.h"
namespace OHOS::DistributedData {
class API_EXPORT CloudReport {
public:
using ReportParam = DistributedData::ReportParam;
API_EXPORT static CloudReport *GetInstance();
API_EXPORT static bool RegisterCloudReportInstance(CloudReport *instance);
virtual std::string GetPrepareTraceId(int32_t userId, const std::string &bundleName);
virtual std::string GetRequestTraceId(int32_t userId, const std::string &bundleName);
virtual bool Report(const ReportParam &reportParam);
private:
static CloudReport *instance_;
};
} // namespace OHOS::DistributedData
#endif // OHOS_DISTRIBUTED_DATA_SERVICES_FRAMEWORK_CLOUD_CLOUD_REPORT_H

View File

@ -38,6 +38,8 @@ public:
GenAsync asyncDetail_;
bool isCompensation_ = false;
int32_t triggerMode_ = MODE_DEFAULT;
std::string prepareTraceId_;
int32_t user_ = 0;
};
SyncEvent(StoreInfo storeInfo, EventInfo info);
~SyncEvent() override = default;
@ -48,6 +50,8 @@ public:
GenAsync GetAsyncDetail() const;
bool IsCompensation() const;
int32_t GetTriggerMode() const;
std::string GetPrepareTraceId() const;
int32_t GetUser() const;
protected:
SyncEvent(int32_t evtId, StoreInfo storeInfo, EventInfo info);

View File

@ -97,6 +97,22 @@ struct SyncParam {
int32_t wait;
bool isCompensation = false;
int32_t triggerMode = MODE_DEFAULT;
std::string prepareTraceId;
int32_t user;
};
enum SyncStage : int8_t {
PREPARE = 0,
START,
END
};
struct ReportParam {
int32_t user = 0;
std::string bundleName;
std::string prepareTraceId;
SyncStage syncStage = SyncStage::PREPARE;
int32_t errCode = 0;
};
using Assets = std::vector<Asset>;

View File

@ -405,25 +405,30 @@ int32_t CloudServiceImpl::NotifyDataChange(const std::string &eventId, const std
if (user == DEFAULT_USER) {
continue;
}
auto &bundleName = exData.info.bundleName;
auto &prepareTraceId = exData.info.context.prepareTraceId;
auto [status, cloudInfo] = GetCloudInfoFromMeta(user);
if (CheckNotifyConditions(exData.info.accountId, exData.info.bundleName, cloudInfo) != E_OK) {
ZLOGD("invalid user:%{public}d", user);
if (CheckNotifyConditions(exData.info.accountId, bundleName, cloudInfo) != E_OK) {
ZLOGD("invalid user:%{public}d, traceId:%{public}s", user, prepareTraceId.c_str());
syncManager_.Report({ user, bundleName, prepareTraceId, SyncStage::END, INVALID_ARGUMENT });
return INVALID_ARGUMENT;
}
auto schemaKey = CloudInfo::GetSchemaKey(user, exData.info.bundleName);
auto schemaKey = CloudInfo::GetSchemaKey(user, bundleName);
SchemaMeta schemaMeta;
if (!MetaDataManager::GetInstance().LoadMeta(schemaKey, schemaMeta, true)) {
ZLOGE("no exist meta, user:%{public}d", user);
ZLOGE("no exist meta, user:%{public}d, traceId:%{public}s", user, prepareTraceId.c_str());
syncManager_.Report({ user, bundleName, prepareTraceId, SyncStage::END, INVALID_ARGUMENT });
return INVALID_ARGUMENT;
}
auto dbInfos = GetDbInfoFromExtraData(exData, schemaMeta);
if (dbInfos.empty()) {
ZLOGE("GetDbInfoFromExtraData failed, empty database info.");
ZLOGE("GetDbInfoFromExtraData failed, empty database info. traceId:%{public}s.", prepareTraceId.c_str());
syncManager_.Report({ user, bundleName, prepareTraceId, SyncStage::END, INVALID_ARGUMENT });
return INVALID_ARGUMENT;
}
for (const auto &dbInfo : dbInfos) {
syncManager_.DoCloudSync(
SyncManager::SyncInfo(cloudInfo.user, exData.info.bundleName, dbInfo.first, dbInfo.second, MODE_PUSH));
syncManager_.DoCloudSync(SyncManager::SyncInfo(
{ cloudInfo.user, bundleName, dbInfo.first, dbInfo.second, MODE_PUSH, prepareTraceId }));
}
}
return SUCCESS;

View File

@ -25,6 +25,7 @@
#include "cloud/sync_event.h"
#include "cloud_value_util.h"
#include "cloud/cloud_lock_event.h"
#include "cloud/cloud_report.h"
#include "device_manager_adapter.h"
#include "dfx/radar_reporter.h"
#include "eventcenter/event_center.h"
@ -69,6 +70,16 @@ SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, con
syncId_ = SyncManager::GenerateId(user);
}
SyncManager::SyncInfo::SyncInfo(const Param &param)
: user_(param.user), bundleName_(param.bundleName), triggerMode_(param.triggerMode)
{
if (!param.store.empty()) {
tables_[param.store] = param.tables;
}
syncId_ = SyncManager::GenerateId(param.user);
prepareTraceId_ = param.prepareTraceId;
}
void SyncManager::SyncInfo::SetMode(int32_t mode)
{
mode_ = mode;
@ -266,9 +277,9 @@ GeneralError SyncManager::IsValid(SyncInfo &info, CloudInfo &cloud)
}
std::function<bool()> SyncManager::GetPostEventTask(const std::vector<SchemaMeta> &schemas, CloudInfo &cloud,
SyncInfo &info, bool retry)
SyncInfo &info, bool retry, const TraceIds &traceIds)
{
return [this, &cloud, &info, &schemas, retry]() {
return [this, &cloud, &info, &schemas, retry, &traceIds]() {
bool isPostEvent = false;
for (auto &schema : schemas) {
if (!cloud.IsOn(schema.bundleName)) {
@ -280,17 +291,21 @@ std::function<bool()> SyncManager::GetPostEventTask(const std::vector<SchemaMeta
}
StoreInfo storeInfo = { 0, schema.bundleName, database.name, cloud.apps[schema.bundleName].instanceId,
info.user_, "", info.syncId_ };
auto it = traceIds.find(schema.bundleName);
auto status = syncStrategy_->CheckSyncAction(storeInfo);
if (status != SUCCESS) {
ZLOGW("Verification strategy failed, status:%{public}d. %{public}d:%{public}s:%{public}s", status,
storeInfo.user, storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str());
QueryKey queryKey{ cloud.id, schema.bundleName, "" };
UpdateFinishSyncInfo(queryKey, info.syncId_, E_BLOCKED_BY_NETWORK_STRATEGY);
UpdateFinishSyncInfo(queryKey, info.syncId_, status);
Report({ cloud.user, schema.bundleName, it == traceIds.end() ? "" : it->second, SyncStage::END,
status });
info.SetError(status);
continue;
}
auto query = info.GenerateQuery(database.name, database.GetTableNames());
SyncParam syncParam = { info.mode_, info.wait_, info.isCompensation_, info.triggerMode_ };
SyncParam syncParam = { info.mode_, info.wait_, info.isCompensation_, info.triggerMode_,
it == traceIds.end() ? "" : it->second, cloud.user };
auto evt = std::make_unique<SyncEvent>(std::move(storeInfo),
SyncEvent::EventInfo{ syncParam, retry, std::move(query), info.async_ });
EventCenter::GetInstance().PostEvent(std::move(evt));
@ -320,21 +335,26 @@ ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount
info.SetError(E_CLOUD_DISABLED);
return;
}
auto traceIds = GetPrepareTraceId(info, cloud);
BatchReport(info.user_, traceIds, SyncStage::PREPARE, E_OK);
UpdateStartSyncInfo(cloudSyncInfos);
auto code = IsValid(info, cloud);
if (code != E_OK) {
BatchUpdateFinishState(cloudSyncInfos, code);
BatchReport(info.user_, traceIds, SyncStage::END, code);
return;
}
auto retryer = GetRetryer(times, info);
auto retryer = GetRetryer(times, info, cloud.user);
auto schemas = GetSchemaMeta(cloud, info.bundleName_);
if (schemas.empty()) {
UpdateSchema(info);
schemas = GetSchemaMeta(cloud, info.bundleName_);
if (schemas.empty()) {
retryer(RETRY_INTERVAL, E_RETRY_TIMEOUT, E_CLOUD_DISABLED);
auto it = traceIds.find(info.bundleName_);
retryer(RETRY_INTERVAL, E_RETRY_TIMEOUT, E_CLOUD_DISABLED, it == traceIds.end() ? "" : it->second);
BatchUpdateFinishState(cloudSyncInfos, E_CLOUD_DISABLED);
BatchReport(info.user_, traceIds, SyncStage::END, E_CLOUD_DISABLED);
return;
}
}
@ -342,11 +362,11 @@ ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount
if (createdByDefaultUser) {
info.user_ = 0;
}
auto task = GetPostEventTask(schemas, cloud, info, retry);
if (task != nullptr && task()) {
return;
}
auto task = GetPostEventTask(schemas, cloud, info, retry, traceIds);
if (task == nullptr || !task()) {
BatchUpdateFinishState(cloudSyncInfos, E_ERROR);
BatchReport(cloud.user, traceIds, SyncStage::END, E_ERROR);
}
};
}
@ -356,35 +376,31 @@ std::function<void(const Event &)> SyncManager::GetSyncHandler(Retryer retryer)
auto &evt = static_cast<const SyncEvent &>(event);
auto &storeInfo = evt.GetStoreInfo();
GenAsync async = evt.GetAsyncDetail();
auto prepareTraceId = evt.GetPrepareTraceId();
auto user = evt.GetUser();
GenDetails details;
auto &detail = details[SyncInfo::DEFAULT_ID];
detail.progress = GenProgress::SYNC_FINISH;
StoreMetaData meta(storeInfo);
meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid;
if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
meta.user = "0"; // check if it is a public store.
StoreMetaDataLocal localMetaData;
if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKeyLocal(), localMetaData, true) ||
!localMetaData.isPublic || !MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
ZLOGE("failed, no store meta. bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
meta.GetStoreAlias().c_str());
return DoExceptionalCallback(async, details, storeInfo);
}
auto [result, meta] = GetMetaData(storeInfo);
if (!result) {
return DoExceptionalCallback(async, details, storeInfo, prepareTraceId);
}
auto store = GetStore(meta, storeInfo.user);
if (store == nullptr) {
ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str());
return DoExceptionalCallback(async, details, storeInfo);
ZLOGE("store null, storeId:%{public}s, prepareTraceId:%{public}s", meta.GetStoreAlias().c_str(),
prepareTraceId.c_str());
return DoExceptionalCallback(async, details, storeInfo, prepareTraceId);
}
ZLOGD("database:<%{public}d:%{public}s:%{public}s> sync start", storeInfo.user, storeInfo.bundleName.c_str(),
meta.GetStoreAlias().c_str());
ZLOGI("database:<%{public}d:%{public}s:%{public}s:%{public}s> sync start", storeInfo.user,
storeInfo.bundleName.c_str(), meta.GetStoreAlias().c_str(), prepareTraceId.c_str());
RadarReporter::Report(
{ storeInfo.bundleName.c_str(), CLOUD_SYNC, TRIGGER_SYNC, storeInfo.syncId, evt.GetTriggerMode() },
"GetSyncHandler", BEGIN);
SyncParam syncParam = { evt.GetMode(), evt.GetWait(), evt.IsCompensation() };
"GetSyncHandler", BizState::BEGIN);
Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::START, E_OK });
SyncParam syncParam = { evt.GetMode(), evt.GetWait(), evt.IsCompensation(), MODE_DEFAULT, prepareTraceId };
auto status = store->Sync({ SyncInfo::DEFAULT_ID }, *(evt.GetQuery()),
evt.AutoRetry() ? RetryCallback(storeInfo, retryer, evt.GetTriggerMode())
: GetCallback(evt.GetAsyncDetail(), storeInfo, evt.GetTriggerMode()),
evt.AutoRetry() ? RetryCallback(storeInfo, retryer, evt.GetTriggerMode(), prepareTraceId, user)
: GetCallback(evt.GetAsyncDetail(), storeInfo, evt.GetTriggerMode(), prepareTraceId, user),
syncParam);
if (status != E_OK) {
if (async) {
@ -397,7 +413,8 @@ std::function<void(const Event &)> SyncManager::GetSyncHandler(Retryer retryer)
}
int32_t errCode = status + GenStore::DB_ERR_OFFSET;
RadarReporter::Report({ storeInfo.bundleName.c_str(), CLOUD_SYNC, FINISH_SYNC, storeInfo.syncId,
evt.GetTriggerMode(), false, errCode }, "GetSyncHandler", END);
evt.GetTriggerMode(), false, errCode }, "GetSyncHandler", BizState::END);
Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::END, errCode });
}
};
}
@ -419,29 +436,35 @@ std::function<void(const Event &)> SyncManager::GetClientChangeHandler()
};
}
SyncManager::Retryer SyncManager::GetRetryer(int32_t times, const SyncInfo &syncInfo)
SyncManager::Retryer SyncManager::GetRetryer(int32_t times, const SyncInfo &syncInfo, int32_t user)
{
if (times >= RETRY_TIMES) {
return [info = SyncInfo(syncInfo)](Duration, int32_t code, int32_t dbCode) mutable {
return [this, user, info = SyncInfo(syncInfo)](Duration, int32_t code, int32_t dbCode,
const std::string &prepareTraceId) mutable {
if (code == E_OK || code == E_SYNC_TASK_MERGED) {
return true;
}
info.SetError(code);
RadarReporter::Report(
{ info.bundleName_.c_str(), CLOUD_SYNC, FINISH_SYNC, info.syncId_, info.triggerMode_, false, dbCode },
"GetRetryer", END);
RadarReporter::Report({ info.bundleName_.c_str(), CLOUD_SYNC, FINISH_SYNC, info.syncId_, info.triggerMode_,
false, dbCode },
"GetRetryer", BizState::END);
Report({ user, info.bundleName_, prepareTraceId, SyncStage::END,
dbCode == GenStore::DB_ERR_OFFSET ? 0 : dbCode });
return true;
};
}
return [this, times, info = SyncInfo(syncInfo)](Duration interval, int32_t code, int32_t dbCode) mutable {
return [this, times, user, info = SyncInfo(syncInfo)](Duration interval, int32_t code, int32_t dbCode,
const std::string &prepareTraceId) mutable {
if (code == E_OK || code == E_SYNC_TASK_MERGED) {
return true;
}
if (code == E_NO_SPACE_FOR_ASSET || code == E_RECODE_LIMIT_EXCEEDED) {
info.SetError(code);
RadarReporter::Report(
{ info.bundleName_.c_str(), CLOUD_SYNC, FINISH_SYNC, info.syncId_, info.triggerMode_, false, dbCode },
"GetRetryer", END);
RadarReporter::Report({ info.bundleName_.c_str(), CLOUD_SYNC, FINISH_SYNC, info.syncId_, info.triggerMode_,
false, dbCode },
"GetRetryer", BizState::END);
Report({ user, info.bundleName_, prepareTraceId, SyncStage::END,
dbCode == GenStore::DB_ERR_OFFSET ? 0 : dbCode });
return true;
}
@ -569,6 +592,36 @@ AutoCache::Store SyncManager::GetStore(const StoreMetaData &meta, int32_t user,
return store;
}
void SyncManager::Report(const ReportParam &reportParam)
{
auto cloudReport = CloudReport::GetInstance();
if (cloudReport == nullptr) {
return;
}
cloudReport->Report(reportParam);
}
SyncManager::TraceIds SyncManager::GetPrepareTraceId(const SyncInfo &info, const CloudInfo &cloud)
{
TraceIds traceIds;
if (!info.prepareTraceId_.empty()) {
traceIds.emplace(info.bundleName_, info.prepareTraceId_);
return traceIds;
}
auto cloudReport = CloudReport::GetInstance();
if (cloudReport == nullptr) {
return traceIds;
}
if (info.bundleName_.empty()) {
for (const auto &it : cloud.apps) {
traceIds.emplace(it.first, cloudReport->GetPrepareTraceId(info.user_, it.first));
}
} else {
traceIds.emplace(info.bundleName_, cloudReport->GetPrepareTraceId(info.user_, info.bundleName_));
}
return traceIds;
}
bool SyncManager::NeedGetCloudInfo(CloudInfo &cloud)
{
return (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true) || !cloud.enableCloud) &&
@ -678,9 +731,9 @@ void SyncManager::UpdateFinishSyncInfo(const QueryKey &queryKey, uint64_t syncId
}
std::function<void(const GenDetails &result)> SyncManager::GetCallback(const GenAsync &async,
const StoreInfo &storeInfo, int32_t triggerMode)
const StoreInfo &storeInfo, int32_t triggerMode, const std::string &prepareTraceId, int32_t user)
{
return [this, async, storeInfo, triggerMode](const GenDetails &result) {
return [this, async, storeInfo, triggerMode, prepareTraceId, user](const GenDetails &result) {
if (async != nullptr) {
async(result);
}
@ -694,6 +747,12 @@ std::function<void(const GenDetails &result)> SyncManager::GetCallback(const Gen
return;
}
int32_t dbCode = (result.begin()->second.dbCode == GenStore::DB_ERR_OFFSET) ? 0 : result.begin()->second.dbCode;
RadarReporter::Report({ storeInfo.bundleName.c_str(), CLOUD_SYNC, FINISH_SYNC, storeInfo.syncId, triggerMode,
result.begin()->second.changeCount, dbCode },
"GetCallback", BizState::END);
Report({ storeInfo.user, storeInfo.bundleName, prepareTraceId, SyncStage::END, dbCode });
auto id = GetAccountId(storeInfo.user);
if (id.empty()) {
ZLOGD("account id is empty");
@ -706,11 +765,7 @@ std::function<void(const GenDetails &result)> SyncManager::GetCallback(const Gen
};
int32_t code = result.begin()->second.code;
int32_t dbCode = (result.begin()->second.dbCode == GenStore::DB_ERR_OFFSET) ? 0 : result.begin()->second.dbCode;
UpdateFinishSyncInfo(queryKey, storeInfo.syncId, code);
RadarReporter::Report({ storeInfo.bundleName.c_str(), CLOUD_SYNC, FINISH_SYNC, storeInfo.syncId, triggerMode,
result.begin()->second.changeCount, dbCode },
"GetCallback", END);
};
}
@ -745,7 +800,8 @@ std::vector<SchemaMeta> SyncManager::GetSchemaMeta(const CloudInfo &cloud, const
return schemas;
}
void SyncManager::DoExceptionalCallback(const GenAsync &async, GenDetails &details, const StoreInfo &storeInfo)
void SyncManager::DoExceptionalCallback(const GenAsync &async, GenDetails &details, const StoreInfo &storeInfo,
const std::string &prepareTraceId)
{
if (async) {
details[SyncInfo::DEFAULT_ID].code = E_ERROR;
@ -753,6 +809,7 @@ void SyncManager::DoExceptionalCallback(const GenAsync &async, GenDetails &detai
}
QueryKey queryKey{ GetAccountId(storeInfo.user), storeInfo.bundleName, "" };
UpdateFinishSyncInfo(queryKey, storeInfo.syncId, E_ERROR);
Report({ storeInfo.user, storeInfo.bundleName, prepareTraceId, SyncStage::END, E_ERROR });
}
bool SyncManager::InitDefaultUser(int32_t &user)
@ -768,10 +825,10 @@ bool SyncManager::InitDefaultUser(int32_t &user)
return true;
}
std::function<void(const DistributedData::GenDetails &result)> SyncManager::RetryCallback(
const StoreInfo &storeInfo, Retryer retryer, int32_t triggerMode)
std::function<void(const DistributedData::GenDetails &result)> SyncManager::RetryCallback(const StoreInfo &storeInfo,
Retryer retryer, int32_t triggerMode, const std::string &prepareTraceId, int32_t user)
{
return [this, retryer, storeInfo, triggerMode](const GenDetails &details) {
return [this, retryer, storeInfo, triggerMode, prepareTraceId, user](const GenDetails &details) {
if (details.empty()) {
ZLOGE("retry, details empty");
return;
@ -784,10 +841,12 @@ std::function<void(const DistributedData::GenDetails &result)> SyncManager::Retr
if (code == E_OK) {
RadarReporter::Report({ storeInfo.bundleName.c_str(), CLOUD_SYNC, FINISH_SYNC, storeInfo.syncId,
triggerMode, details.begin()->second.changeCount },
"RetryCallback", END);
"RetryCallback", BizState::END);
Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::END,
dbCode == GenStore::DB_ERR_OFFSET ? 0 : dbCode });
}
}
retryer(GetInterval(code), code, dbCode);
retryer(GetInterval(code), code, dbCode, prepareTraceId);
};
}
@ -799,4 +858,27 @@ void SyncManager::BatchUpdateFinishState(const std::vector<std::tuple<QueryKey,
}
}
void SyncManager::BatchReport(int32_t userId, const TraceIds &traceIds, SyncStage syncStage, int32_t errCode)
{
for (const auto &[bundle, id] : traceIds) {
Report({ userId, bundle, id, syncStage, errCode });
}
}
std::pair<bool, StoreMetaData> SyncManager::GetMetaData(const StoreInfo &storeInfo)
{
StoreMetaData meta(storeInfo);
meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid;
if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
meta.user = "0"; // check if it is a public store.
StoreMetaDataLocal localMetaData;
if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKeyLocal(), localMetaData, true) ||
!localMetaData.isPublic || !MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
ZLOGE("failed, no store meta. bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
meta.GetStoreAlias().c_str());
return { false, meta };
}
}
return { true, meta };
}
} // namespace OHOS::CloudData

View File

@ -39,17 +39,29 @@ public:
using AutoCache = DistributedData::AutoCache;
using StoreMetaData = DistributedData::StoreMetaData;
using SchemaMeta = DistributedData::SchemaMeta;
using TraceIds = std::map<std::string, std::string>;
using SyncStage = DistributedData::SyncStage;
using ReportParam = DistributedData::ReportParam;
static AutoCache::Store GetStore(const StoreMetaData &meta, int32_t user, bool mustBind = true);
class SyncInfo final {
public:
using Store = std::string;
using Stores = std::vector<Store>;
using Tables = std::vector<std::string>;
struct Param {
int32_t user;
std::string bundleName;
Store store;
Tables tables;
int32_t triggerMode = 0;
std::string prepareTraceId;
};
using MutliStoreTables = std::map<Store, Tables>;
explicit SyncInfo(int32_t user, const std::string &bundleName = "", const Store &store = "",
const Tables &tables = {}, int32_t triggerMode = 0);
SyncInfo(int32_t user, const std::string &bundleName, const Stores &stores);
SyncInfo(int32_t user, const std::string &bundleName, const MutliStoreTables &tables);
explicit SyncInfo(const Param &param);
void SetMode(int32_t mode);
void SetWait(int32_t wait);
void SetAsyncDetail(GenAsync asyncDetail);
@ -57,6 +69,7 @@ public:
void SetError(int32_t code) const;
void SetCompensation(bool isCompensation);
void SetTriggerMode(int32_t triggerMode);
void SetPrepareTraceId(const std::string &prepareTraceId);
std::shared_ptr<GenQuery> GenerateQuery(const std::string &store, const Tables &tables);
bool Contains(const std::string &storeName);
inline static constexpr const char *DEFAULT_ID = "default";
@ -74,6 +87,7 @@ public:
std::shared_ptr<GenQuery> query_;
bool isCompensation_ = false;
int32_t triggerMode_ = 0;
std::string prepareTraceId_;
};
SyncManager();
~SyncManager();
@ -81,13 +95,15 @@ public:
int32_t DoCloudSync(SyncInfo syncInfo);
int32_t StopCloudSync(int32_t user = 0);
int32_t QueryLastSyncInfo(const std::vector<QueryKey> &queryKeys, QueryLastResults &results);
void Report(const ReportParam &reportParam);
private:
using Event = DistributedData::Event;
using Task = ExecutorPool::Task;
using TaskId = ExecutorPool::TaskId;
using Duration = ExecutorPool::Duration;
using Retryer = std::function<bool(Duration interval, int32_t status, int32_t dbCode)>;
using Retryer =
std::function<bool(Duration interval, int32_t status, int32_t dbCode, const std::string &prepareTraceId)>;
using CloudInfo = DistributedData::CloudInfo;
using StoreInfo = DistributedData::StoreInfo;
using SyncStrategy = DistributedData::SyncStrategy;
@ -117,25 +133,29 @@ private:
void UpdateSchema(const SyncInfo &syncInfo);
std::function<void(const Event &)> GetSyncHandler(Retryer retryer);
std::function<void(const Event &)> GetClientChangeHandler();
Retryer GetRetryer(int32_t times, const SyncInfo &syncInfo);
Retryer GetRetryer(int32_t times, const SyncInfo &syncInfo, int32_t user);
RefCount GenSyncRef(uint64_t syncId);
int32_t Compare(uint64_t syncId, int32_t user);
GeneralError IsValid(SyncInfo &info, CloudInfo &cloud);
void UpdateStartSyncInfo(const std::vector<std::tuple<QueryKey, uint64_t>> &cloudSyncInfos);
void UpdateFinishSyncInfo(const QueryKey &queryKey, uint64_t syncId, int32_t code);
std::function<void(const DistributedData::GenDetails &result)> GetCallback(const GenAsync &async,
const StoreInfo &storeInfo, int32_t triggerMode);
const StoreInfo &storeInfo, int32_t triggerMode, const std::string &prepareTraceId, int32_t user);
std::function<bool()> GetPostEventTask(const std::vector<SchemaMeta> &schemas, CloudInfo &cloud, SyncInfo &info,
bool retry);
void DoExceptionalCallback(const GenAsync &async, GenDetails &details, const StoreInfo &storeInfo);
bool retry, const TraceIds &traceIds);
void DoExceptionalCallback(const GenAsync &async, GenDetails &details, const StoreInfo &storeInfo,
const std::string &prepareTraceId);
bool InitDefaultUser(int32_t &user);
std::function<void(const DistributedData::GenDetails &result)> RetryCallback(
const StoreInfo &storeInfo, Retryer retryer, int32_t triggerMode);
std::function<void(const DistributedData::GenDetails &result)> RetryCallback(const StoreInfo &storeInfo,
Retryer retryer, int32_t triggerMode, const std::string &prepareTraceId, int32_t user);
static void GetLastResults(
const std::string &storeId, std::map<SyncId, CloudSyncInfo> &infos, QueryLastResults &results);
void BatchUpdateFinishState(const std::vector<std::tuple<QueryKey, uint64_t>> &cloudSyncInfos, int32_t code);
bool NeedSaveSyncInfo(const QueryKey &queryKey);
std::function<void(const Event &)> GetLockChangeHandler();
void BatchReport(int32_t userId, const TraceIds &traceIds, SyncStage syncStage, int32_t errCode);
TraceIds GetPrepareTraceId(const SyncInfo &info, const CloudInfo &cloud);
std::pair<bool, StoreMetaData> GetMetaData(const StoreInfo &storeInfo);
static std::atomic<uint32_t> genId_;
std::shared_ptr<ExecutorPool> executor_;

View File

@ -350,13 +350,14 @@ KVDBGeneralStore::DBSyncCallback KVDBGeneralStore::GetDBSyncCompleteCB(DetailAsy
};
}
DBStatus KVDBGeneralStore::CloudSync(
const Devices &devices, DistributedDB::SyncMode cloudSyncMode, DetailAsync async, int64_t wait)
DBStatus KVDBGeneralStore::CloudSync(const Devices &devices, DistributedDB::SyncMode cloudSyncMode, DetailAsync async,
int64_t wait, const std::string &prepareTraceId)
{
DistributedDB::CloudSyncOption syncOption;
syncOption.devices = devices;
syncOption.mode = cloudSyncMode;
syncOption.waitTime = wait;
syncOption.prepareTraceId = prepareTraceId;
syncOption.lockAction = DistributedDB::LockAction::NONE;
if (storeInfo_.user == 0) {
std::vector<int32_t> users;
@ -368,13 +369,13 @@ DBStatus KVDBGeneralStore::CloudSync(
return delegate_->Sync(syncOption, GetDBProcessCB(async));
}
int32_t KVDBGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async, SyncParam &syncParm)
int32_t KVDBGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async, SyncParam &syncParam)
{
auto syncMode = GeneralStore::GetSyncMode(syncParm.mode);
auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
if (delegate_ == nullptr) {
ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParm.mode);
devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode);
return GeneralError::E_ALREADY_CLOSED;
}
DBStatus dbStatus;
@ -383,10 +384,10 @@ int32_t KVDBGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAs
if (!enableCloud_) {
return GeneralError::E_NOT_SUPPORT;
}
dbStatus = CloudSync(devices, dbMode, async, syncParm.wait);
dbStatus = CloudSync(devices, dbMode, async, syncParam.wait, syncParam.prepareTraceId);
} else {
if (devices.empty()) {
ZLOGE("Devices is empty! mode:%{public}d", syncParm.mode);
ZLOGE("Devices is empty! mode:%{public}d", syncParam.mode);
return GeneralError::E_INVALID_ARGS;
}
KVDBQuery *kvQuery = nullptr;

View File

@ -94,7 +94,8 @@ private:
std::vector<uint8_t> GetNewKey(std::vector<uint8_t> &key, const std::string &uuid);
DBSyncCallback GetDBSyncCompleteCB(DetailAsync async);
DBProcessCB GetDBProcessCB(DetailAsync async);
DBStatus CloudSync(const Devices &devices, DistributedDB::SyncMode cloudSyncMode, DetailAsync async, int64_t wait);
DBStatus CloudSync(const Devices &devices, DistributedDB::SyncMode cloudSyncMode, DetailAsync async, int64_t wait,
const std::string &prepareTraceId);
void GetIdentifierParams(std::vector<std::string> &devices,
const std::vector<std::string> &uuids, int32_t authType);
class ObserverProxy : public DistributedDB::KvStoreObserver {

View File

@ -345,4 +345,12 @@ void RdbCloud::ConvertErrorField(DistributedData::VBuckets& extends)
errorField->second = ConvertStatus(static_cast<GeneralError>(*errCode));
}
}
void RdbCloud::SetPrepareTraceId(const std::string &traceId)
{
if (cloudDB_ == nullptr) {
return;
}
cloudDB_->SetPrepareTraceId(traceId);
}
} // namespace OHOS::DistributedRdb

View File

@ -56,6 +56,7 @@ public:
uint8_t GetLockFlag() const;
std::pair<GeneralError, uint32_t> LockCloudDB(FLAG flag);
GeneralError UnLockCloudDB(FLAG flag);
void SetPrepareTraceId(const std::string &traceId) override;
private:
static constexpr const char *TYPE_FIELD = "#_type";

View File

@ -521,9 +521,9 @@ int32_t RdbGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsy
std::chrono::minutes(INTERVAL));
tasks_->Insert(syncId, { id, callback });
}
dbStatus =
delegate_->Sync({ devices, dbMode, dbQuery, syncParam.wait, (isPriority || highMode == MANUAL_SYNC_MODE),
syncParam.isCompensation, {}, highMode == AUTO_SYNC_MODE, LOCK_ACTION },
dbStatus = delegate_->Sync({ devices, dbMode, dbQuery, syncParam.wait,
(isPriority || highMode == MANUAL_SYNC_MODE), syncParam.isCompensation, {},
highMode == AUTO_SYNC_MODE, LOCK_ACTION, syncParam.prepareTraceId },
tasks_ != nullptr ? GetCB(syncId) : callback, syncId);
if (dbStatus == DBStatus::OK || tasks_ == nullptr) {
return ConvertStatus(dbStatus);

View File

@ -1507,7 +1507,8 @@ HWTEST_F(CloudDataTest, GetPostEventTask, TestSize.Level0)
info.tables_.insert_or_assign(TEST_CLOUD_STORE, value);
CloudData::SyncManager sync;
auto task = sync.GetPostEventTask(schemas, cloudInfo_, info, true);
std::map<std::string, std::string> traceIds;
auto task = sync.GetPostEventTask(schemas, cloudInfo_, info, true, traceIds);
auto ret = task();
EXPECT_TRUE(ret);
}
@ -1524,13 +1525,15 @@ HWTEST_F(CloudDataTest, GetRetryer, TestSize.Level0)
CloudData::SyncManager::SyncInfo info(user);
CloudData::SyncManager sync;
CloudData::SyncManager::Duration duration;
auto ret = sync.GetRetryer(CloudData::SyncManager::RETRY_TIMES, info)(duration, E_OK, E_OK);
std::string prepareTraceId;
auto ret = sync.GetRetryer(CloudData::SyncManager::RETRY_TIMES, info, user)(duration, E_OK, E_OK, prepareTraceId);
EXPECT_TRUE(ret);
ret = sync.GetRetryer(CloudData::SyncManager::RETRY_TIMES, info)(duration, E_SYNC_TASK_MERGED, E_SYNC_TASK_MERGED);
ret = sync.GetRetryer(CloudData::SyncManager::RETRY_TIMES, info, user)(duration, E_SYNC_TASK_MERGED,
E_SYNC_TASK_MERGED, prepareTraceId);
EXPECT_TRUE(ret);
ret = sync.GetRetryer(0, info)(duration, E_OK, E_OK);
ret = sync.GetRetryer(0, info, user)(duration, E_OK, E_OK, prepareTraceId);
EXPECT_TRUE(ret);
ret = sync.GetRetryer(0, info)(duration, E_SYNC_TASK_MERGED, E_SYNC_TASK_MERGED);
ret = sync.GetRetryer(0, info, user)(duration, E_SYNC_TASK_MERGED, E_SYNC_TASK_MERGED, prepareTraceId);
EXPECT_TRUE(ret);
}
@ -1550,21 +1553,22 @@ HWTEST_F(CloudDataTest, GetCallback, TestSize.Level0)
storeInfo.user = user;
storeInfo.bundleName = "testBundleName";
int32_t triggerMode = MODE_DEFAULT;
std::string prepareTraceId;
GenAsync async = nullptr;
sync.GetCallback(async, storeInfo, triggerMode)(result);
sync.GetCallback(async, storeInfo, triggerMode, prepareTraceId, user)(result);
int32_t process = 0;
async = [&process](const GenDetails &details) { process = details.begin()->second.progress; };
GenProgressDetail detail;
detail.progress = GenProgress::SYNC_IN_PROGRESS;
result.insert_or_assign("test", detail);
sync.GetCallback(async, storeInfo, triggerMode)(result);
sync.GetCallback(async, storeInfo, triggerMode, prepareTraceId, user)(result);
EXPECT_EQ(process, GenProgress::SYNC_IN_PROGRESS);
detail.progress = GenProgress::SYNC_FINISH;
result.insert_or_assign("test", detail);
storeInfo.user = -1;
sync.GetCallback(async, storeInfo, triggerMode)(result);
sync.GetCallback(async, storeInfo, triggerMode, prepareTraceId, user)(result);
storeInfo.user = user;
sync.GetCallback(async, storeInfo, triggerMode)(result);
sync.GetCallback(async, storeInfo, triggerMode, prepareTraceId, user)(result);
EXPECT_EQ(process, GenProgress::SYNC_FINISH);
}
@ -1613,22 +1617,24 @@ HWTEST_F(CloudDataTest, GetCloudSyncInfo, TestSize.Level0)
*/
HWTEST_F(CloudDataTest, RetryCallback, TestSize.Level0)
{
int32_t user = 100;
std::string prepareTraceId;
CloudData::SyncManager sync;
StoreInfo storeInfo;
int32_t retCode = -1;
CloudData::SyncManager::Retryer retry = [&retCode](CloudData::SyncManager::Duration interval, int32_t code,
int32_t dbCode) {
int32_t dbCode, const std::string &prepareTraceId) {
retCode = code;
return true;
};
DistributedData::GenDetails result;
auto task = sync.RetryCallback(storeInfo, retry, MODE_DEFAULT);
auto task = sync.RetryCallback(storeInfo, retry, MODE_DEFAULT, prepareTraceId, user);
task(result);
GenProgressDetail detail;
detail.progress = GenProgress::SYNC_IN_PROGRESS;
detail.code = 100;
result.insert_or_assign("test", detail);
task = sync.RetryCallback(storeInfo, retry, MODE_DEFAULT);
task = sync.RetryCallback(storeInfo, retry, MODE_DEFAULT, prepareTraceId, user);
task(result);
EXPECT_EQ(retCode, detail.code);
}

View File

@ -418,12 +418,13 @@ HWTEST_F(KVDBGeneralStoreTest, CloudSync, TestSize.Level0)
store->storeInfo_.user = 0;
auto cloudSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY;
store->SetEqualIdentifier(bundleName, storeName);
auto ret = store->CloudSync(devices, cloudSyncMode, asyncs, 0);
std::string prepareTraceId;
auto ret = store->CloudSync(devices, cloudSyncMode, asyncs, 0, prepareTraceId);
EXPECT_EQ(ret, DBStatus::OK);
store->storeInfo_.user = 1;
cloudSyncMode = DistributedDB::SyncMode::SYNC_MODE_CLOUD_FORCE_PUSH;
ret = store->CloudSync(devices, cloudSyncMode, asyncs, 0);
ret = store->CloudSync(devices, cloudSyncMode, asyncs, 0, prepareTraceId);
EXPECT_EQ(ret, DBStatus::OK);
}