mirror of
https://gitee.com/openharmony/distributeddatamgr_relational_store
synced 2024-11-27 01:01:02 +00:00
!1885 Fix lambda capture of this
Merge pull request !1885 from Anvette/master
This commit is contained in:
commit
3ed10557c8
@ -158,6 +158,20 @@ protected:
|
||||
private:
|
||||
using Stmt = std::shared_ptr<Statement>;
|
||||
using RdbParam = DistributedRdb::RdbSyncerParam;
|
||||
using Options = DistributedRdb::RdbService::Option;
|
||||
using Memo = DistributedRdb::PredicatesMemo;
|
||||
class CloudTables {
|
||||
public:
|
||||
int32_t AddTables(const std::vector<std::string> &tables);
|
||||
int32_t RmvTables(const std::vector<std::string> &tables);
|
||||
int32_t Changed(const std::string &table);
|
||||
std::set<std::string> Steal();
|
||||
|
||||
private:
|
||||
std::mutex mutex_;
|
||||
std::set<std::string> tables_;
|
||||
std::set<std::string> changes_;
|
||||
};
|
||||
|
||||
static void AfterOpen(const RdbParam ¶m, int32_t retry = 0);
|
||||
int InnerOpen();
|
||||
@ -170,7 +184,7 @@ private:
|
||||
std::pair<int32_t, Stmt> BeginExecuteSql(const std::string &sql);
|
||||
int GetDataBasePath(const std::string &databasePath, std::string &backupFilePath);
|
||||
void DoCloudSync(const std::string &table);
|
||||
int InnerSync(const DistributedRdb::RdbService::Option &option, const DistributedRdb::PredicatesMemo &predicates,
|
||||
static int InnerSync(const RdbParam ¶m, const Options &option, const Memo &predicates,
|
||||
const AsyncDetail &async);
|
||||
int InnerBackup(const std::string &databasePath,
|
||||
const std::vector<uint8_t> &destEncryptKey = std::vector<uint8_t>());
|
||||
@ -227,8 +241,7 @@ private:
|
||||
std::mutex mutex_;
|
||||
std::shared_ptr<ConnectionPool> connectionPool_ = nullptr;
|
||||
std::shared_ptr<DelayNotify> delayNotifier_ = nullptr;
|
||||
std::shared_ptr<std::set<std::string>> syncTables_ = nullptr;
|
||||
std::set<std::string> cloudTables_;
|
||||
std::shared_ptr<CloudTables> cloudInfo_ = std::make_shared<CloudTables>();
|
||||
std::map<std::string, std::list<std::shared_ptr<RdbStoreLocalObserver>>> localObservers_;
|
||||
std::map<std::string, std::list<sptr<RdbStoreLocalSharedObserver>>> localSharedObservers_;
|
||||
ConcurrentMap<std::string, std::string> attachedInfo_;
|
||||
|
@ -84,6 +84,18 @@ public:
|
||||
private:
|
||||
using Stmt = std::shared_ptr<Statement>;
|
||||
using RdbParam = DistributedRdb::RdbSyncerParam;
|
||||
class CloudTables {
|
||||
public:
|
||||
int32_t AddTables(const std::vector<std::string> &tables);
|
||||
int32_t RmvTables(const std::vector<std::string> &tables);
|
||||
int32_t Changed(const std::string &table);
|
||||
std::set<std::string> Steal();
|
||||
|
||||
private:
|
||||
std::mutex mutex_;
|
||||
std::set<std::string> tables_;
|
||||
std::set<std::string> changes_;
|
||||
};
|
||||
|
||||
int InnerOpen();
|
||||
void InitSyncerParam(const RdbStoreConfig &config, bool created);
|
||||
@ -133,8 +145,7 @@ private:
|
||||
std::string fileType_;
|
||||
std::mutex mutex_;
|
||||
std::shared_ptr<ConnectionPool> connectionPool_ = nullptr;
|
||||
std::shared_ptr<std::set<std::string>> syncTables_ = nullptr;
|
||||
std::set<std::string> cloudTables_;
|
||||
std::shared_ptr<CloudTables> cloudInfo_ = std::make_shared<CloudTables>();
|
||||
ConcurrentMap<std::string, std::string> attachedInfo_;
|
||||
ConcurrentMap<int64_t, std::shared_ptr<Connection>> trxConnMap_ = {};
|
||||
std::list<std::weak_ptr<Transaction>> transactions_;
|
||||
|
@ -322,9 +322,9 @@ int RdbStoreImpl::SetDistributedTables(const std::vector<std::string> &tables, i
|
||||
{
|
||||
std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
|
||||
if (distributedConfig.autoSync) {
|
||||
cloudTables_.insert(tables.begin(), tables.end());
|
||||
cloudInfo_->AddTables(tables);
|
||||
} else {
|
||||
std::for_each(tables.begin(), tables.end(), [this](const auto &table) { cloudTables_.erase(table); });
|
||||
cloudInfo_->RmvTables(tables);
|
||||
return E_OK;
|
||||
}
|
||||
}
|
||||
@ -387,23 +387,23 @@ int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predica
|
||||
rdbOption.mode = option.mode;
|
||||
rdbOption.isAsync = !option.isBlock;
|
||||
RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
|
||||
ret = InnerSync(rdbOption, predicate.GetDistributedPredicates(), async);
|
||||
ret = InnerSync(syncerParam_, rdbOption, predicate.GetDistributedPredicates(), async);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int RdbStoreImpl::InnerSync(const DistributedRdb::RdbService::Option &option,
|
||||
const DistributedRdb::PredicatesMemo &predicates, const RdbStore::AsyncDetail &async)
|
||||
int RdbStoreImpl::InnerSync(const RdbParam ¶m, const Options &option, const Memo &predicates,
|
||||
const AsyncDetail &async)
|
||||
{
|
||||
auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
|
||||
auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
|
||||
if (errCode == E_NOT_SUPPORT) {
|
||||
return errCode;
|
||||
}
|
||||
if (errCode != E_OK) {
|
||||
LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.",
|
||||
errCode, syncerParam_.bundleName_.c_str());
|
||||
LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
|
||||
param.bundleName_.c_str());
|
||||
return errCode;
|
||||
}
|
||||
errCode = service->Sync(syncerParam_, option, predicates, async);
|
||||
errCode = service->Sync(param, option, predicates, async);
|
||||
if (errCode != E_OK) {
|
||||
LOG_ERROR("Sync is failed, err is %{public}d.", errCode);
|
||||
return errCode;
|
||||
@ -821,7 +821,8 @@ int RdbStoreImpl::ModifyLockStatus(const AbsRdbPredicates &predicates, bool isLo
|
||||
if (errCode == E_WAIT_COMPENSATED_SYNC) {
|
||||
LOG_DEBUG("Start compensation sync.");
|
||||
DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true, true };
|
||||
InnerSync(option, AbsRdbPredicates(predicates.GetTableName()).GetDistributedPredicates(), nullptr);
|
||||
auto memo = AbsRdbPredicates(predicates.GetTableName()).GetDistributedPredicates();
|
||||
InnerSync(syncerParam_, option, memo, nullptr);
|
||||
return E_OK;
|
||||
}
|
||||
if (errCode != E_OK) {
|
||||
@ -2096,26 +2097,9 @@ std::string RdbStoreImpl::GetName()
|
||||
void RdbStoreImpl::DoCloudSync(const std::string &table)
|
||||
{
|
||||
#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
|
||||
{
|
||||
std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
|
||||
if (cloudTables_.empty() || (!table.empty() && cloudTables_.find(table) == cloudTables_.end())) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
if (syncTables_ == nullptr) {
|
||||
syncTables_ = std::make_shared<std::set<std::string>>();
|
||||
}
|
||||
auto empty = syncTables_->empty();
|
||||
if (table.empty()) {
|
||||
syncTables_->insert(cloudTables_.begin(), cloudTables_.end());
|
||||
} else {
|
||||
syncTables_->insert(table);
|
||||
}
|
||||
if (!empty) {
|
||||
return;
|
||||
}
|
||||
auto needSync = cloudInfo_->Changed(table);
|
||||
if (!needSync) {
|
||||
return;
|
||||
}
|
||||
auto pool = TaskExecutor::GetInstance().GetExecutor();
|
||||
if (pool == nullptr) {
|
||||
@ -2123,19 +2107,18 @@ void RdbStoreImpl::DoCloudSync(const std::string &table)
|
||||
}
|
||||
auto interval =
|
||||
std::chrono::duration_cast<std::chrono::steady_clock::duration>(std::chrono::milliseconds(INTERVAL));
|
||||
pool->Schedule(interval, [this]() {
|
||||
std::shared_ptr<std::set<std::string>> ptr;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
ptr = syncTables_;
|
||||
syncTables_ = nullptr;
|
||||
pool->Schedule(interval, [cloudInfo = std::weak_ptr<CloudTables>(cloudInfo_), param = syncerParam_]() {
|
||||
auto changeInfo = cloudInfo.lock();
|
||||
if (changeInfo == nullptr) {
|
||||
return ;
|
||||
}
|
||||
if (ptr == nullptr) {
|
||||
auto tables = changeInfo->Steal();
|
||||
if (tables.empty()) {
|
||||
return;
|
||||
}
|
||||
DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true };
|
||||
InnerSync(option,
|
||||
AbsRdbPredicates(std::vector<std::string>(ptr->begin(), ptr->end())).GetDistributedPredicates(), nullptr);
|
||||
auto memo = AbsRdbPredicates(std::vector<std::string>(tables.begin(), tables.end())).GetDistributedPredicates();
|
||||
InnerSync(param, option, memo, nullptr);
|
||||
});
|
||||
#endif
|
||||
}
|
||||
@ -2232,9 +2215,7 @@ int RdbStoreImpl::Restore(const std::string &backupPath, const std::vector<uint8
|
||||
Reportor::ReportRestore(Reportor::Create(config_, E_OK), corrupt);
|
||||
rebuild_ = RebuiltType::NONE;
|
||||
}
|
||||
if (!cloudTables_.empty()) {
|
||||
DoCloudSync("");
|
||||
}
|
||||
DoCloudSync("");
|
||||
return errCode;
|
||||
}
|
||||
|
||||
@ -2378,4 +2359,51 @@ std::pair<int32_t, std::shared_ptr<Transaction>> RdbStoreImpl::CreateTransaction
|
||||
transactions_.push_back(trans);
|
||||
return { errCode, trans };
|
||||
}
|
||||
|
||||
int32_t RdbStoreImpl::CloudTables::AddTables(const std::vector<std::string> &tables)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
for (auto &table : tables) {
|
||||
tables_.insert(table);
|
||||
}
|
||||
return E_OK;
|
||||
}
|
||||
|
||||
int32_t RdbStoreImpl::CloudTables::RmvTables(const std::vector<std::string> &tables)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
for (auto &table : tables) {
|
||||
tables_.erase(table);
|
||||
}
|
||||
return E_OK;
|
||||
}
|
||||
|
||||
int32_t RdbStoreImpl::CloudTables::Changed(const std::string &table)
|
||||
{
|
||||
bool needSync = false;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
if (tables_.empty() || (!table.empty() && tables_.find(table) == tables_.end())) {
|
||||
return needSync;
|
||||
}
|
||||
|
||||
needSync = changes_.empty();
|
||||
if (!table.empty()) {
|
||||
changes_.insert(table);
|
||||
} else {
|
||||
changes_.insert(tables_.begin(), tables_.end());
|
||||
}
|
||||
}
|
||||
return needSync;
|
||||
}
|
||||
|
||||
std::set<std::string> RdbStoreImpl::CloudTables::Steal()
|
||||
{
|
||||
std::set<std::string> result;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
result = std::move(changes_);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
} // namespace OHOS::NativeRdb
|
Loading…
Reference in New Issue
Block a user