del auto sync

Signed-off-by: yangliu <yangliu178@huawei.com>
This commit is contained in:
yangliu 2024-07-31 20:54:53 +08:00
parent 564a569e3f
commit 0defe00b4c
10 changed files with 405 additions and 2 deletions

View File

@ -63,9 +63,11 @@
"ability_base",
"ability_runtime",
"bundle_framework",
"access_token",
"common_event_service",
"c_utils",
"device_manager",
"dmsfwk",
"ffrt",
"hilog",
"hisysevent",

View File

@ -83,6 +83,11 @@ ohos_source_set("distributeddatafwk_src_file") {
]
sources = old_sources + kvdb_sources
if (qemu_disable) {
sources += [ "../../kvdb/src/auto_sync_timer.cpp" ]
} else {
sources += [ "../../kvdb/src/auto_sync_timer_mock.cpp" ]
}
configs = [ ":module_private_config" ]
deps = [
@ -92,6 +97,8 @@ ohos_source_set("distributeddatafwk_src_file") {
"//foundation/distributedhardware/device_manager/interfaces/inner_kits/native_cpp:devicemanagersdk",
]
external_deps = [
"access_token:libaccesstoken_sdk",
"access_token:libtokenid_sdk",
"c_utils:utils",
"file_api:securitylabel",
"hilog:libhilog",
@ -102,6 +109,9 @@ ohos_source_set("distributeddatafwk_src_file") {
"ipc:ipc_single",
"samgr:samgr_proxy",
]
if (dms_service_enable && qemu_disable) {
external_deps += [ "dmsfwk:distributed_sdk" ]
}
public_external_deps = [
"data_share:datashare_common",

View File

@ -0,0 +1,49 @@
/*
* Copyright (c) 2022 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef SDB_AUTO_SYNC_TIMER_H
#define SDB_AUTO_SYNC_TIMER_H
#include <set>
#include <vector>
#include "concurrent_map.h"
#include "kvdb_service.h"
#include "task_executor.h"
namespace OHOS::DistributedKv {
class AutoSyncTimer {
public:
static constexpr uint32_t FORCE_SYNC_INTERVAL = 200;
static constexpr uint32_t AUTO_SYNC_INTERVAL = 50;
static AutoSyncTimer &GetInstance();
void DoAutoSync(const std::string &appId, std::set<StoreId> storeIds);
private:
static constexpr size_t TIME_TASK_NUM = 5;
static constexpr size_t SYNC_STORE_NUM = 10;
AutoSyncTimer() = default;
~AutoSyncTimer() = default;
std::map<std::string, std::vector<StoreId>> GetStoreIds();
std::function<void()> ProcessTask() __attribute__((no_sanitize("cfi")));
void StartTimer();
void StopTimer();
void AddSyncStores(const std::string &appId, std::set<StoreId> storeIds);
bool HasSyncStores();
std::pair<bool, std::string> HasCollaboration(const std::string &appId);
ConcurrentMap<std::string, std::vector<StoreId>> stores_;
TaskExecutor::TaskId delaySyncTaskId_;
TaskExecutor::TaskId forceSyncTaskId_;
std::mutex mutex_;
};
} // namespace OHOS::DistributedKv
#endif // SDB_AUTO_SYNC_TIMER_H

View File

@ -131,11 +131,13 @@ private:
Status DoSync(SyncInfo &syncInfo, std::shared_ptr<SyncCallback> observer);
Status DoSyncExt(SyncInfo &syncInfo, std::shared_ptr<SyncCallback> observer);
Status DoClientSync(SyncInfo &syncInfo, std::shared_ptr<SyncCallback> observer);
void DoAutoSync();
Status SyncExt(const std::string &networkId, uint64_t sequenceId);
bool IsRemoteChanged(const std::string &deviceId);
void DoNotifyChange();
void Register();
bool isApplication_ = false;
bool autoSync_ = false;
bool cloudAutoSync_ = false;
bool isClientSync_ = false;

View File

@ -0,0 +1,150 @@
/*
* Copyright (c) 2022 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define LOG_TAG "AutoSyncTimer"
#include "auto_sync_timer.h"
#include "dms_handler.h"
#include "ipc_skeleton.h"
#include "kvdb_service_client.h"
#include "log_print.h"
namespace OHOS::DistributedKv {
AutoSyncTimer &AutoSyncTimer::GetInstance()
{
static AutoSyncTimer instance;
return instance;
}
void AutoSyncTimer::StartTimer()
{
std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
if (forceSyncTaskId_ == TaskExecutor::INVALID_TASK_ID) {
forceSyncTaskId_ =
TaskExecutor::GetInstance().Schedule(std::chrono::milliseconds(FORCE_SYNC_INTERVAL), ProcessTask());
}
if (delaySyncTaskId_ == TaskExecutor::INVALID_TASK_ID) {
delaySyncTaskId_ =
TaskExecutor::GetInstance().Schedule(std::chrono::milliseconds(AUTO_SYNC_INTERVAL), ProcessTask());
} else {
delaySyncTaskId_ =
TaskExecutor::GetInstance().Reset(delaySyncTaskId_, std::chrono::milliseconds(AUTO_SYNC_INTERVAL));
}
}
void AutoSyncTimer::DoAutoSync(const std::string &appId, std::set<StoreId> storeIds)
{
AddSyncStores(appId, std::move(storeIds));
StartTimer();
}
void AutoSyncTimer::AddSyncStores(const std::string &appId, std::set<StoreId> storeIds)
{
stores_.Compute(appId, [&storeIds](const auto &key, std::vector<StoreId> &value) {
std::set<StoreId> tempStores(value.begin(), value.end());
for (auto it = storeIds.begin(); it != storeIds.end(); it++) {
if (tempStores.count(*it) == 0) {
value.push_back(*it);
}
}
return !value.empty();
});
}
bool AutoSyncTimer::HasSyncStores()
{
return !stores_.Empty();
}
std::map<std::string, std::vector<StoreId>> AutoSyncTimer::GetStoreIds()
{
std::map<std::string, std::vector<StoreId>> stores;
int count = SYNC_STORE_NUM;
stores_.EraseIf([&stores, &count](const std::string &key, std::vector<StoreId> &value) {
int size = value.size();
if (size <= count) {
stores.insert({ key, std::move(value) });
count = count - size;
return true;
}
auto &innerStore = stores[key];
auto it = value.begin();
while (it != value.end() && count > 0) {
innerStore.push_back(*it);
it++;
count--;
}
value.erase(value.begin(), it);
return value.empty();
});
return stores;
}
std::function<void()> AutoSyncTimer::ProcessTask()
{
return [this]() {
StopTimer();
auto service = KVDBServiceClient::GetInstance();
if (service == nullptr) {
StartTimer();
return;
}
auto storeIds = GetStoreIds();
for (const auto &id : storeIds) {
auto res = HasCollaboration(id.first);
if (!res.first) {
continue;
}
KVDBService::SyncInfo syncInfo;
syncInfo.devices.push_back(res.second);
ZLOGD("DoSync appId:%{public}s store size:%{public}zu", id.first.c_str(), id.second.size());
for (const auto &storeId : id.second) {
service->Sync({ id.first }, storeId, syncInfo);
}
}
if (HasSyncStores()) {
StartTimer();
}
};
}
std::pair<bool, std::string> AutoSyncTimer::HasCollaboration(const std::string &appId)
{
std::vector<DistributedSchedule::EventNotify> events;
auto status = DistributedSchedule::DmsHandler::GetInstance().GetDSchedEventInfo(
DistributedSchedule::DMS_COLLABORATION, events);
if (status != SUCCESS) {
ZLOGE("Get collaboration events failed, status:%{public}d", status);
return { false, "" };
}
for (const auto &event : events) {
if (event.srcBundleName_ == appId || event.destBundleName_ == appId) {
ZLOGI("The application is collaboration, srcBundleName:%{public}s, destBundleName:%{public}s",
event.srcBundleName_.c_str(), event.destBundleName_.c_str());
return { true, std::move(event.dstNetworkId_) };
}
}
ZLOGD("The application is not collaboration, appId:%{public}s", appId.c_str());
return { false, "" };
}
void AutoSyncTimer::StopTimer()
{
std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
TaskExecutor::GetInstance().Remove(forceSyncTaskId_);
TaskExecutor::GetInstance().Remove(delaySyncTaskId_);
forceSyncTaskId_ = TaskExecutor::INVALID_TASK_ID;
delaySyncTaskId_ = TaskExecutor::INVALID_TASK_ID;
}
} // namespace OHOS::DistributedKv

View File

@ -0,0 +1,132 @@
/*
* Copyright (c) 2022 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define LOG_TAG "AutoSyncTimerMock"
#include "auto_sync_timer.h"
#include "kvdb_service_client.h"
#include "log_print.h"
namespace OHOS::DistributedKv {
AutoSyncTimer &AutoSyncTimer::GetInstance()
{
static AutoSyncTimer instance;
return instance;
}
void AutoSyncTimer::StartTimer()
{
std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
if (forceSyncTaskId_ == TaskExecutor::INVALID_TASK_ID) {
forceSyncTaskId_ =
TaskExecutor::GetInstance().Schedule(std::chrono::milliseconds(FORCE_SYNC_INTERVAL), ProcessTask());
}
if (delaySyncTaskId_ == TaskExecutor::INVALID_TASK_ID) {
delaySyncTaskId_ =
TaskExecutor::GetInstance().Schedule(std::chrono::milliseconds(AUTO_SYNC_INTERVAL), ProcessTask());
} else {
delaySyncTaskId_ =
TaskExecutor::GetInstance().Reset(delaySyncTaskId_, std::chrono::milliseconds(AUTO_SYNC_INTERVAL));
}
}
void AutoSyncTimer::DoAutoSync(const std::string &appId, std::set<StoreId> storeIds)
{
AddSyncStores(appId, std::move(storeIds));
StartTimer();
}
void AutoSyncTimer::AddSyncStores(const std::string &appId, std::set<StoreId> storeIds)
{
stores_.Compute(appId, [&storeIds](const auto &key, std::vector<StoreId> &value) {
std::set<StoreId> tempStores(value.begin(), value.end());
for (auto it = storeIds.begin(); it != storeIds.end(); it++) {
if (tempStores.count(*it) == 0) {
value.push_back(*it);
}
}
return !value.empty();
});
}
bool AutoSyncTimer::HasSyncStores()
{
return !stores_.Empty();
}
std::map<std::string, std::vector<StoreId>> AutoSyncTimer::GetStoreIds()
{
std::map<std::string, std::vector<StoreId>> stores;
int count = SYNC_STORE_NUM;
stores_.EraseIf([&stores, &count](const std::string &key, std::vector<StoreId> &value) {
int size = value.size();
if (size <= count) {
stores.insert({ key, std::move(value) });
count = count - size;
return true;
}
auto &innerStore = stores[key];
auto it = value.begin();
while (it != value.end() && count > 0) {
innerStore.push_back(*it);
it++;
count--;
}
value.erase(value.begin(), it);
return value.empty();
});
return stores;
}
std::function<void()> AutoSyncTimer::ProcessTask()
{
return [this]() {
StopTimer();
auto service = KVDBServiceClient::GetInstance();
if (service == nullptr) {
StartTimer();
return;
}
auto storeIds = GetStoreIds();
for (const auto &id : storeIds) {
auto res = HasCollaboration(id.first);
if (!res.first) {
continue;
}
ZLOGD("DoSync appId:%{public}s store size:%{public}zu", id.first.c_str(), id.second.size());
for (const auto &storeId : id.second) {
KVDBService::SyncInfo syncInfo;
service->Sync({ id.first }, storeId, syncInfo);
}
}
if (HasSyncStores()) {
StartTimer();
}
};
}
std::pair<bool, std::string> AutoSyncTimer::HasCollaboration(const std::string &appId)
{
return { true, "" };
}
void AutoSyncTimer::StopTimer()
{
std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
TaskExecutor::GetInstance().Remove(forceSyncTaskId_);
TaskExecutor::GetInstance().Remove(delaySyncTaskId_);
forceSyncTaskId_ = TaskExecutor::INVALID_TASK_ID;
delaySyncTaskId_ = TaskExecutor::INVALID_TASK_ID;
}
} // namespace OHOS::DistributedKv

View File

@ -15,9 +15,12 @@
#define LOG_TAG "SingleStoreImpl"
#include "single_store_impl.h"
#include "accesstoken_kit.h"
#include "auto_sync_timer.h"
#include "backup_manager.h"
#include "dds_trace.h"
#include "dev_manager.h"
#include "ipc_skeleton.h"
#include "kvdb_service_client.h"
#include "log_print.h"
#include "store_result_set.h"
@ -26,6 +29,7 @@
namespace OHOS::DistributedKv {
using namespace OHOS::DistributedDataDfx;
using namespace std::chrono;
using namespace Security::AccessToken;
SingleStoreImpl::SingleStoreImpl(
std::shared_ptr<DBStore> dbStore, const AppId &appId, const Options &options, const Convertor &cvt)
: convertor_(cvt), dbStore_(std::move(dbStore))
@ -42,6 +46,10 @@ SingleStoreImpl::SingleStoreImpl(
if (options.backup) {
BackupManager::GetInstance().Prepare(path, storeId_);
}
uint32_t tokenId = IPCSkeleton::GetSelfTokenID();
if (AccessTokenKit::GetTokenTypeFlag(tokenId) == TOKEN_HAP) {
isApplication_ = true;
}
}
SingleStoreImpl::~SingleStoreImpl()
@ -89,6 +97,7 @@ Status SingleStoreImpl::Put(const Key &key, const Value &value)
ZLOGE("status:0x%{public}x key:%{public}s, value size:%{public}zu", status,
StoreUtil::Anonymous(key.ToString()).c_str(), value.Size());
}
DoAutoSync();
DoNotifyChange();
return status;
}
@ -119,6 +128,7 @@ Status SingleStoreImpl::PutBatch(const std::vector<Entry> &entries)
if (status != SUCCESS) {
ZLOGE("status:0x%{public}x entries size:%{public}zu", status, entries.size());
}
DoAutoSync();
DoNotifyChange();
return status;
}
@ -142,6 +152,7 @@ Status SingleStoreImpl::Delete(const Key &key)
if (status != SUCCESS) {
ZLOGE("status:0x%{public}x key:%{public}s", status, StoreUtil::Anonymous(key.ToString()).c_str());
}
DoAutoSync();
DoNotifyChange();
return status;
}
@ -169,6 +180,7 @@ Status SingleStoreImpl::DeleteBatch(const std::vector<Key> &keys)
if (status != SUCCESS) {
ZLOGE("status:0x%{public}x keys size:%{public}zu", status, keys.size());
}
DoAutoSync();
DoNotifyChange();
return status;
}
@ -1008,7 +1020,7 @@ Status SingleStoreImpl::SetConfig(const StoreConfig &storeConfig)
void SingleStoreImpl::DoNotifyChange()
{
if (!autoSync_ && !cloudAutoSync_ && dataType_ == DataType::TYPE_DYNAMICAL) {
if (!cloudAutoSync_) {
return;
}
auto now = GetTimeStamp();
@ -1030,6 +1042,15 @@ void SingleStoreImpl::DoNotifyChange()
});
}
void SingleStoreImpl::DoAutoSync()
{
if (!autoSync_ || !isApplication_) {
return;
}
ZLOGD("app:%{public}s store:%{public}s!", appId_.c_str(), StoreUtil::Anonymous(storeId_).c_str());
AutoSyncTimer::GetInstance().DoAutoSync(appId_, { { storeId_ } });
}
void SingleStoreImpl::OnRemoteDied()
{
std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);

View File

@ -80,6 +80,11 @@ ohos_source_set("kvdb_src_file") {
]
sources = old_sources + kvdb_sources
if (qemu_disable) {
sources += [ "../src/auto_sync_timer.cpp" ]
} else {
sources += [ "../src/auto_sync_timer_mock.cpp" ]
}
configs = [ ":module_private_config" ]
deps = [
@ -89,6 +94,8 @@ ohos_source_set("kvdb_src_file") {
"//foundation/distributedhardware/device_manager/interfaces/inner_kits/native_cpp:devicemanagersdk",
]
external_deps = [
"access_token:libaccesstoken_sdk",
"access_token:libtokenid_sdk",
"c_utils:utils",
"file_api:securitylabel",
"hilog:libhilog",
@ -99,6 +106,9 @@ ohos_source_set("kvdb_src_file") {
"ipc:ipc_single",
"samgr:samgr_proxy",
]
if (dms_service_enable && qemu_disable) {
external_deps += [ "dmsfwk:distributed_sdk" ]
}
public_external_deps = [
"data_share:datashare_common",

View File

@ -102,6 +102,8 @@ deps_config = [
]
external_deps_config = [
"access_token:libaccesstoken_sdk",
"access_token:libtokenid_sdk",
"c_utils:utils",
"device_manager:devicemanagersdk",
"hisysevent:libhisysevent",
@ -124,12 +126,22 @@ ohos_shared_library("distributeddata_inner") {
debug = false
}
sources = old_source_config + kvdb_source_config
if (qemu_disable) {
sources +=
[ "../../../frameworks/innerkitsimpl/kvdb/src/auto_sync_timer.cpp" ]
} else {
sources += [
"../../../frameworks/innerkitsimpl/kvdb/src/auto_sync_timer_mock.cpp",
]
}
configs = [ ":distributeddatafwk_config" ]
public_configs = [ ":distributeddatafwk_public_config" ]
deps = deps_config
external_deps = external_deps_config
if (dms_service_enable && qemu_disable) {
external_deps += [ "dmsfwk:distributed_sdk" ]
}
innerapi_tags = [ "platformsdk" ]
subsystem_name = "distributeddatamgr"
part_name = "kv_store"

View File

@ -22,3 +22,18 @@ third_party_path = "//third_party"
use_platform_win = "${current_os}_${current_cpu}" == "mingw_x86_64"
use_platforn_mac = "${current_os}_${current_cpu}" == "mac_x64" ||
"${current_os}_${current_cpu}" == "mac_arm64"
declare_args() {
if (!defined(global_parts_info) ||
defined(global_parts_info.ability_dmsfwk)) {
dms_service_enable = true
} else {
dms_service_enable = false
}
if (product_name != "qemu-arm-linux-min") {
qemu_disable = true
} else {
qemu_disable = false
}
}