Signed-off-by: htt1997 <hutao105@huawei.com>
This commit is contained in:
htt1997 2024-11-19 15:29:35 +08:00
parent ba4faabcc4
commit e294186716
7 changed files with 162 additions and 58 deletions

View File

@ -158,6 +158,20 @@ protected:
private:
using Stmt = std::shared_ptr<Statement>;
using RdbParam = DistributedRdb::RdbSyncerParam;
using Options = DistributedRdb::RdbService::Option;
using Memo = DistributedRdb::PredicatesMemo;
class CloudTables {
public:
int32_t AddTables(const std::vector<std::string> &tables);
int32_t RmvTables(const std::vector<std::string> &tables);
bool Change(const std::string &table);
std::set<std::string> Steal();
private:
std::mutex mutex_;
std::set<std::string> tables_;
std::set<std::string> changes_;
};
static void AfterOpen(const RdbParam &param, int32_t retry = 0);
int InnerOpen();
@ -170,7 +184,7 @@ private:
std::pair<int32_t, Stmt> BeginExecuteSql(const std::string &sql);
int GetDataBasePath(const std::string &databasePath, std::string &backupFilePath);
void DoCloudSync(const std::string &table);
int InnerSync(const DistributedRdb::RdbService::Option &option, const DistributedRdb::PredicatesMemo &predicates,
static int InnerSync(const RdbParam &param, const Options &option, const Memo &predicates,
const AsyncDetail &async);
int InnerBackup(const std::string &databasePath,
const std::vector<uint8_t> &destEncryptKey = std::vector<uint8_t>());
@ -227,8 +241,7 @@ private:
std::mutex mutex_;
std::shared_ptr<ConnectionPool> connectionPool_ = nullptr;
std::shared_ptr<DelayNotify> delayNotifier_ = nullptr;
std::shared_ptr<std::set<std::string>> syncTables_ = nullptr;
std::set<std::string> cloudTables_;
std::shared_ptr<CloudTables> cloudInfo_ = std::make_shared<CloudTables>();
std::map<std::string, std::list<std::shared_ptr<RdbStoreLocalObserver>>> localObservers_;
std::map<std::string, std::list<sptr<RdbStoreLocalSharedObserver>>> localSharedObservers_;
ConcurrentMap<std::string, std::string> attachedInfo_;

View File

@ -84,6 +84,18 @@ public:
private:
using Stmt = std::shared_ptr<Statement>;
using RdbParam = DistributedRdb::RdbSyncerParam;
class CloudTables {
public:
int32_t AddTables(const std::vector<std::string> &tables);
int32_t RmvTables(const std::vector<std::string> &tables);
bool Change(const std::string &table);
std::set<std::string> Steal();
private:
std::mutex mutex_;
std::set<std::string> tables_;
std::set<std::string> changes_;
};
int InnerOpen();
void InitSyncerParam(const RdbStoreConfig &config, bool created);
@ -133,8 +145,7 @@ private:
std::string fileType_;
std::mutex mutex_;
std::shared_ptr<ConnectionPool> connectionPool_ = nullptr;
std::shared_ptr<std::set<std::string>> syncTables_ = nullptr;
std::set<std::string> cloudTables_;
std::shared_ptr<CloudTables> cloudInfo_ = std::make_shared<CloudTables>();
ConcurrentMap<std::string, std::string> attachedInfo_;
ConcurrentMap<int64_t, std::shared_ptr<Connection>> trxConnMap_ = {};
std::list<std::weak_ptr<Transaction>> transactions_;

View File

@ -77,6 +77,7 @@ using RdbMgr = DistributedRdb::RdbManagerImpl;
static constexpr const char *BEGIN_TRANSACTION_SQL = "begin;";
static constexpr const char *COMMIT_TRANSACTION_SQL = "commit;";
static constexpr const char *ROLLBACK_TRANSACTION_SQL = "rollback;";
static constexpr const char *BACKUP_RESTORE = "backup.restore";
constexpr int64_t TIME_OUT = 1500;
void RdbStoreImpl::InitSyncerParam(const RdbStoreConfig &config, bool created)
@ -318,9 +319,9 @@ int RdbStoreImpl::SetDistributedTables(const std::vector<std::string> &tables, i
{
std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
if (distributedConfig.autoSync) {
cloudTables_.insert(tables.begin(), tables.end());
cloudInfo_->AddTables(tables);
} else {
std::for_each(tables.begin(), tables.end(), [this](const auto &table) { cloudTables_.erase(table); });
cloudInfo_->RmvTables(tables);
return E_OK;
}
}
@ -383,23 +384,23 @@ int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predica
rdbOption.mode = option.mode;
rdbOption.isAsync = !option.isBlock;
RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
ret = InnerSync(rdbOption, predicate.GetDistributedPredicates(), async);
ret = InnerSync(syncerParam_, rdbOption, predicate.GetDistributedPredicates(), async);
return ret;
}
int RdbStoreImpl::InnerSync(const DistributedRdb::RdbService::Option &option,
const DistributedRdb::PredicatesMemo &predicates, const RdbStore::AsyncDetail &async)
int RdbStoreImpl::InnerSync(const RdbParam &param, const Options &option, const Memo &predicates,
const AsyncDetail &async)
{
auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
if (errCode == E_NOT_SUPPORT) {
return errCode;
}
if (errCode != E_OK) {
LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.",
errCode, syncerParam_.bundleName_.c_str());
LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
param.bundleName_.c_str());
return errCode;
}
errCode = service->Sync(syncerParam_, option, predicates, async);
errCode = service->Sync(param, option, predicates, async);
if (errCode != E_OK) {
LOG_ERROR("Sync is failed, err is %{public}d.", errCode);
return errCode;
@ -817,7 +818,8 @@ int RdbStoreImpl::ModifyLockStatus(const AbsRdbPredicates &predicates, bool isLo
if (errCode == E_WAIT_COMPENSATED_SYNC) {
LOG_DEBUG("Start compensation sync.");
DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true, true };
InnerSync(option, AbsRdbPredicates(predicates.GetTableName()).GetDistributedPredicates(), nullptr);
auto memo = AbsRdbPredicates(predicates.GetTableName()).GetDistributedPredicates();
InnerSync(syncerParam_, option, memo, nullptr);
return E_OK;
}
if (errCode != E_OK) {
@ -1011,7 +1013,9 @@ std::pair<int, int64_t> RdbStoreImpl::BatchInsert(const std::string &table, cons
for (const auto &[sql, bindArgs] : executeSqlArgs) {
auto [errCode, statement] = GetStatement(sql, connection);
if (statement == nullptr) {
continue;
LOG_ERROR("statement is nullptr, errCode:0x%{public}x, args:%{public}zu, table:%{public}s, sql:%{public}s",
errCode, bindArgs.size(), table.c_str(), sql.c_str());
return { E_OK, -1 };
}
for (const auto &args : bindArgs) {
auto errCode = statement->Execute(args);
@ -1419,7 +1423,7 @@ int RdbStoreImpl::Backup(const std::string &databasePath, const std::vector<uint
return ret;
}
RdbSecurityManager::KeyFiles keyFiles(backupFilePath);
RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
keyFiles.Lock();
auto deleteDirtyFiles = [&backupFilePath] {
@ -2039,26 +2043,9 @@ std::string RdbStoreImpl::GetName()
void RdbStoreImpl::DoCloudSync(const std::string &table)
{
#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
{
std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
if (cloudTables_.empty() || (!table.empty() && cloudTables_.find(table) == cloudTables_.end())) {
return;
}
}
{
std::lock_guard<std::mutex> lock(mutex_);
if (syncTables_ == nullptr) {
syncTables_ = std::make_shared<std::set<std::string>>();
}
auto empty = syncTables_->empty();
if (table.empty()) {
syncTables_->insert(cloudTables_.begin(), cloudTables_.end());
} else {
syncTables_->insert(table);
}
if (!empty) {
return;
}
auto needSync = cloudInfo_->Change(table);
if (!needSync) {
return;
}
auto pool = TaskExecutor::GetInstance().GetExecutor();
if (pool == nullptr) {
@ -2066,19 +2053,18 @@ void RdbStoreImpl::DoCloudSync(const std::string &table)
}
auto interval =
std::chrono::duration_cast<std::chrono::steady_clock::duration>(std::chrono::milliseconds(INTERVAL));
pool->Schedule(interval, [this]() {
std::shared_ptr<std::set<std::string>> ptr;
{
std::lock_guard<std::mutex> lock(mutex_);
ptr = syncTables_;
syncTables_ = nullptr;
pool->Schedule(interval, [cloudInfo = std::weak_ptr<CloudTables>(cloudInfo_), param = syncerParam_]() {
auto changeInfo = cloudInfo.lock();
if (changeInfo == nullptr) {
return ;
}
if (ptr == nullptr) {
auto tables = changeInfo->Steal();
if (tables.empty()) {
return;
}
DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true };
InnerSync(option,
AbsRdbPredicates(std::vector<std::string>(ptr->begin(), ptr->end())).GetDistributedPredicates(), nullptr);
auto memo = AbsRdbPredicates(std::vector<std::string>(tables.begin(), tables.end())).GetDistributedPredicates();
InnerSync(param, option, memo, nullptr);
});
#endif
}
@ -2139,10 +2125,10 @@ int RdbStoreImpl::Restore(const std::string &backupPath, const std::vector<uint8
return E_ERROR;
}
RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
keyFiles.Lock();
std::string destPath;
bool isOK = TryGetMasterSlaveBackupPath(backupPath, destPath, true);
RdbSecurityManager::KeyFiles keyFiles(destPath);
keyFiles.Lock();
if (!isOK) {
int ret = GetDestPath(backupPath, destPath);
if (ret != E_OK) {
@ -2175,6 +2161,7 @@ int RdbStoreImpl::Restore(const std::string &backupPath, const std::vector<uint8
Reportor::ReportRestore(Reportor::Create(config_, E_OK), corrupt);
rebuild_ = RebuiltType::NONE;
}
DoCloudSync("");
return errCode;
}
@ -2318,4 +2305,51 @@ std::pair<int32_t, std::shared_ptr<Transaction>> RdbStoreImpl::CreateTransaction
transactions_.push_back(trans);
return { errCode, trans };
}
} // namespace OHOS::NativeRdb
int32_t RdbStoreImpl::CloudTables::AddTables(const std::vector<std::string> &tables)
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto &table : tables) {
tables_.insert(table);
}
return E_OK;
}
int32_t RdbStoreImpl::CloudTables::RmvTables(const std::vector<std::string> &tables)
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto &table : tables) {
tables_.erase(table);
}
return E_OK;
}
bool RdbStoreImpl::CloudTables::Change(const std::string &table)
{
bool needSync = false;
{
std::lock_guard<std::mutex> lock(mutex_);
if (tables_.empty() || (!table.empty() && tables_.find(table) == tables_.end())) {
return needSync;
}
// from empty, then need schedule the cloud sync, others only wait the schedule execute.
needSync = changes_.empty();
if (!table.empty()) {
changes_.insert(table);
} else {
changes_.insert(tables_.begin(), tables_.end());
}
}
return needSync;
}
std::set<std::string> RdbStoreImpl::CloudTables::Steal()
{
std::set<std::string> result;
{
std::lock_guard<std::mutex> lock(mutex_);
result = std::move(changes_);
}
return result;
}
} // namespace OHOS::NativeRdb

View File

@ -43,7 +43,6 @@ using namespace OHOS::Rdb;
using Reportor = RdbFaultHiViewReporter;
__attribute__((used))
const bool RdbStoreManager::regCollector_ = RdbFaultHiViewReporter::RegCollector(RdbStoreManager::Collector);
constexpr int RETRY_INTERVAL = 1;
RdbStoreManager &RdbStoreManager::GetInstance()
{
static RdbStoreManager manager;
@ -72,13 +71,6 @@ std::shared_ptr<RdbStoreImpl> RdbStoreManager::GetStoreFromCache(const RdbStoreC
}
if (!(rdbStore->GetConfig() == config)) {
storeCache_.erase(it);
auto pool = TaskExecutor::GetInstance().GetExecutor();
if (pool != nullptr) {
pool->Schedule(std::chrono::seconds(RETRY_INTERVAL), [config, rdbStore]() {
Reportor::Report(Reportor::Create(config, E_CONFIG_INVALID_CHANGE,
"ErrorType:Config diff!" + RdbStoreConfig::Format(rdbStore->GetConfig(), config)));
});
}
LOG_INFO("app[%{public}s:%{public}s] path[%{public}s]"
" cfg[%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}s]"
" %{public}s",

View File

@ -406,5 +406,37 @@ describe('rdbStoreInsertTest', function () {
console.log(TAG + "************* testRdbStorebatchInsert002 end *************");
})
/**
* @tc.name: rdb batchInsert test
* @tc.number: SUB_DDM_AppDataFWK_JSRDB_batchInsert_0003
* @tc.desc: rdb batchInsert not exist column test
* @tc.require: issueIB3DGQ
*/
it('testRdbStorebatchInsert003', 0, async function () {
console.log(TAG + "************* testRdbStorebatchInsert003 start *************");
await rdbStore.executeSql("delete from test");
let valueBucketArray = new Array();
var u8 = new Uint8Array([1, 2, 3])
const valueBucket = {
"name": "zhangsan",
"age": 18,
"salary": 11.5,
"blobType": u8,
"notexistcolumn": 1,
}
valueBucketArray.push(valueBucket);
let errCode = await rdbStore.batchInsert("test", valueBucketArray);
expect(-1).assertEqual(errCode);
let resultSet = await rdbStore.querySql("SELECT * FROM test");
let count = resultSet.rowCount;
expect(0).assertEqual(count);
resultSet.close()
console.log(TAG + "************* testRdbStorebatchInsert003 end *************");
})
console.log(TAG + "*************Unit Test End*************");
})

View File

@ -2453,7 +2453,7 @@ describe('rdbPredicatesTest', function () {
console.log(TAG + "************* testIn0004 end *************");
})
/**
/**
* @tc.name predicates in normal test
* @tc.number SUB_DDM_AppDataFWK_JSRDB_Predicates_0194
* @tc.desc predicates in normal test

View File

@ -259,6 +259,28 @@ HWTEST_F(RdbStoreImplTest, Rdb_BatchInsertTest_001, TestSize.Level2)
EXPECT_EQ(E_OK, ret);
}
/* *
* @tc.name: Rdb_BatchInsertTest_002
* @tc.desc: Abnormal testCase for BatchInsert, if column is not exist.
* @tc.type: FUNC
*/
HWTEST_F(RdbStoreImplTest, Rdb_BatchInsertTest_002, TestSize.Level2)
{
store_->ExecuteSql("CREATE TABLE batchInsertTest "
"(id INTEGER PRIMARY KEY AUTOINCREMENT, data INTEGER, data1 INTEGER, "
"data2 INTEGER);");
ValuesBucket valuesBucket;
valuesBucket.PutInt("data", ValueObject(0));
valuesBucket.PutInt("data1", ValueObject(1));
valuesBucket.PutInt("data2", ValueObject(2));
valuesBucket.PutInt("NonexistentColumn", ValueObject(3));
std::vector<ValuesBucket> valuesBuckets = { valuesBucket };
int64_t insertNum = 1;
int ret = store_->BatchInsert(insertNum, "batchInsertTest", valuesBuckets);
EXPECT_EQ(-1, insertNum);
EXPECT_EQ(E_OK, ret);
}
/* *
* @tc.name: Rdb_QueryTest_001
* @tc.desc: Abnormal testCase for Query, if table name is empty