Increase the range of LimltWalSize and modify the checkpoint to asynchronous

Signed-off-by: changjiaxing <changjiaxing2@huawei.com>
Change-Id: I0d00506e107019c8549ca9af472a1d32b20d40a1
This commit is contained in:
changjiaxing 2024-01-13 21:50:20 +08:00
parent eb90b34e99
commit f31f04934c
15 changed files with 203 additions and 272 deletions

View File

@ -29,7 +29,7 @@ public:
void SetExecutorPool(std::shared_ptr<ExecutorPool> pool);
void SetTask(Task task);
void UpdateNotify(const DistributedRdb::RdbChangedData &changedData);
void SetAutoSyncInterval(uint32_t autoSyncInterval);
private:
static constexpr uint32_t FORCE_SYNC_INTERVAL = 100;
static constexpr uint32_t AUTO_SYNC_INTERVAL = 50;
@ -43,6 +43,8 @@ private:
void StartTimer();
void StopTimer();
void ExecuteTask();
void RestoreDefaultSyncInterval();
uint32_t autoSyncInterval_ = AUTO_SYNC_INTERVAL;
};
}
#endif // NATIVE_RDB_DELAY_NOTIFY_H

View File

@ -178,12 +178,15 @@ public:
int CleanDirtyData(const std::string &table, uint64_t cursor = UINT64_MAX) override;
private:
using ExecuteSqls = std::vector<std::pair<std::string, std::vector<std::vector<ValueObject>>>>;
int InnerOpen();
int CheckAttach(const std::string &sql);
int BeginExecuteSql(const std::string &sql, std::shared_ptr<SqliteConnection> &connection);
int FreeTransaction(std::shared_ptr<SqliteConnection> connection, const std::string &sql);
std::pair<std::string, std::vector<ValueObject>> GetInsertParams(
std::map<std::string, ValueObject> &valuesMap, const std::string &table);
ExecuteSqls GenerateSql(
const std::string &table, const std::vector<ValuesBucket> &initialBatchValues, int limitVariableNumber);
ExecuteSqls MakeExecuteSqls(
const std::string &sql, const std::vector<ValueObject> &args, int fieldSize, int limitVariableNumber);
int GetDataBasePath(const std::string &databasePath, std::string &backupFilePath);
int ExecuteSqlInner(const std::string &sql, const std::vector<ValueObject> &bindArgs);
int ExecuteGetLongInner(const std::string &sql, const std::vector<ValueObject> &bindArgs);
@ -232,6 +235,8 @@ private:
static constexpr char SCHEME_RDB[] = "rdb://";
std::map<std::string, std::list<std::shared_ptr<RdbStoreLocalObserver>>> localObservers_;
std::map<std::string, std::list<sptr<RdbStoreLocalSharedObserver>>> localSharedObservers_;
static constexpr uint32_t EXPANSION = 2;
static constexpr uint32_t AUTO_SYNC_MAX_INTERVAL = 20000;
};
} // namespace OHOS::NativeRdb
#endif

View File

@ -57,6 +57,7 @@ public:
int EndStepQuery();
void SetInTransaction(bool transaction);
bool IsInTransaction();
int TryCheckPoint();
int LimitWalSize();
#ifdef RDB_SUPPORT_ICU
int ConfigLocale(const std::string localeStr);
@ -65,7 +66,7 @@ public:
AppDataFwk::SharedBlock *sharedBlock, int startPos, int requiredPos, bool isCountAllRows);
int CleanDirtyData(const std::string &table, uint64_t cursor);
int RegisterCallBackObserver(const DataChangeCallback &clientChangedData);
int GetMaxVariableNumber();
private:
static constexpr const char *MERGE_ASSETS_FUNC = "merge_assets";
explicit SqliteConnection(bool isWriteConnection);
@ -111,8 +112,11 @@ private:
static constexpr uint32_t ITER_V1 = 5000;
static constexpr uint32_t ITERS[] = {NO_ITER, ITER_V1};
static constexpr uint32_t ITERS_COUNT = sizeof(ITERS) / sizeof(ITERS[0]);
bool isConfigured_ = false;
int maxVariableNumber_;
};
} // namespace NativeRdb
} // namespace OHOS
#endif
#endif

View File

@ -28,7 +28,6 @@
#include "rdb_store_config.h"
#include "sqlite_connection.h"
#include "base_transaction.h"
namespace OHOS {
namespace NativeRdb {
class SqliteConnectionPool {

View File

@ -28,7 +28,8 @@ public:
static constexpr int SOFT_HEAP_LIMIT = 8 * 1024 * 1024; /* 8MB */
static constexpr int DB_PAGE_SIZE = 4096; /* default page size : 4k */
static constexpr int DB_JOURNAL_SIZE = 1024 * 1024; /* default file size : 1M */
static constexpr int DB_WAL_SIZE_LIMIT = 200 * 1024 * 1024; /* default wal file maximum size : 200M */
static constexpr int DB_WAL_SIZE_LIMIT_MIN = 20 * 1024 * 1024; /* default wal file maximum size : 20M */
static constexpr int DB_WAL_SIZE_LIMIT_MAX = 200 * 1024 * 1024; /* default wal file maximum size : 200M */
static constexpr int WAL_AUTO_CHECKPOINT = 100; /* 100 pages */
static constexpr int APP_DEFAULT_UMASK = 0002;
static constexpr int SQLITE_MAX_COLUMN = 2000;

View File

@ -97,14 +97,17 @@ public:
int Delete(int &deletedRows, const AbsRdbPredicates &predicates) override;
private:
using ExecuteSqls = std::vector<std::pair<std::string, std::vector<std::vector<ValueObject>>>>;
int InnerOpen();
int CheckAttach(const std::string &sql);
bool PathToRealPath(const std::string &path, std::string &realPath);
std::string ExtractFilePath(const std::string &fileFullName);
int BeginExecuteSql(const std::string &sql, std::shared_ptr<SqliteConnection> &connection);
int FreeTransaction(std::shared_ptr<SqliteConnection> connection, const std::string &sql);
std::pair<std::string, std::vector<ValueObject>> GetInsertParams(
std::map<std::string, ValueObject> &valuesMap, const std::string &table);
ExecuteSqls GenerateSql(
const std::string &table, const std::vector<ValuesBucket> &initialBatchValues, int limitVariableNumber);
ExecuteSqls MakeExecuteSqls(
const std::string &sql, const std::vector<ValueObject> &args, int fieldSize, int limitVariableNumber);
int GetDataBasePath(const std::string &databasePath, std::string &backupFilePath);
int ExecuteSqlInner(const std::string &sql, const std::vector<ValueObject> &bindArgs);
int ExecuteGetLongInner(const std::string &sql, const std::vector<ValueObject> &bindArgs);
@ -125,6 +128,8 @@ private:
std::string name;
std::string fileType;
bool isEncrypt_;
static constexpr uint32_t EXPANSION = 2;
static constexpr uint32_t AUTO_SYNC_MAX_INTERVAL = 20000;
};
} // namespace OHOS::NativeRdb
#endif

View File

@ -55,13 +55,14 @@ public:
int EndStepQuery();
void SetInTransaction(bool transaction);
bool IsInTransaction();
int TryCheckPoint();
int LimitWalSize();
#ifdef RDB_SUPPORT_ICU
int ConfigLocale(const std::string localeStr);
#endif
int RegisterCallBackObserver(const DataChangeCallback &clientChangedData);
int GetMaxVariableNumber();
private:
static constexpr const char *MERGE_ASSETS_FUNC = "merge_assets";
explicit SqliteConnection(bool isWriteConnection);
@ -105,6 +106,9 @@ private:
static constexpr uint32_t ITER_V1 = 5000;
static constexpr uint32_t ITERS[] = {NO_ITER, ITER_V1};
static constexpr uint32_t ITERS_COUNT = sizeof(ITERS) / sizeof(ITERS[0]);
bool isConfigured_ = false;
int maxVariableNumber_;
};
} // namespace NativeRdb
} // namespace OHOS

View File

@ -27,7 +27,6 @@
#include "rdb_store_config.h"
#include "sqlite_connection.h"
#include "base_transaction.h"
namespace OHOS {
namespace NativeRdb {
class SqliteConnectionPool {

View File

@ -68,16 +68,16 @@ void DelayNotify::StartTimer()
if (pool_ == nullptr) {
return;
}
if (forceSyncTaskId_ == Executor::INVALID_TASK_ID) {
if (forceSyncTaskId_ == Executor::INVALID_TASK_ID && autoSyncInterval_ == AUTO_SYNC_INTERVAL) {
forceSyncTaskId_ = pool_->Schedule(std::chrono::milliseconds(FORCE_SYNC_INTERVAL),
[this]() { ExecuteTask(); });
}
if (delaySyncTaskId_ == Executor::INVALID_TASK_ID) {
delaySyncTaskId_ = pool_->Schedule(std::chrono::milliseconds(AUTO_SYNC_INTERVAL),
delaySyncTaskId_ = pool_->Schedule(std::chrono::milliseconds(autoSyncInterval_),
[this]() { ExecuteTask(); });
} else {
delaySyncTaskId_ =
pool_->Reset(delaySyncTaskId_, std::chrono::milliseconds(AUTO_SYNC_INTERVAL));
pool_->Reset(delaySyncTaskId_, std::chrono::milliseconds(autoSyncInterval_));
}
}
@ -95,6 +95,7 @@ void DelayNotify::ExecuteTask()
{
LOG_DEBUG("Notify data change.");
std::lock_guard<std::mutex> lock(mutex_);
RestoreDefaultSyncInterval();
StopTimer();
if (task_ != nullptr && changedData_.tableData.size() > 0) {
int errCode = task_(changedData_);
@ -105,4 +106,15 @@ void DelayNotify::ExecuteTask()
}
changedData_.tableData.clear();
}
void DelayNotify::SetAutoSyncInterval(uint32_t interval)
{
autoSyncInterval_ = interval;
}
void DelayNotify::RestoreDefaultSyncInterval()
{
autoSyncInterval_ = AUTO_SYNC_INTERVAL;
}
}

View File

@ -317,92 +317,128 @@ int RdbStoreImpl::BatchInsert(int64_t &outInsertNum, const std::string &table,
outInsertNum = 0;
return E_OK;
}
// prepare batch data & sql
std::vector<std::pair<std::string, std::vector<ValueObject>>> vecVectorObj;
for (auto iter = initialBatchValues.begin(); iter != initialBatchValues.end(); ++iter) {
auto values = (*iter).GetAll();
vecVectorObj.push_back(GetInsertParams(values, table));
}
// prepare BeginTransaction
int errCode = connectionPool->AcquireTransaction();
if (errCode != E_OK) {
return errCode;
}
auto connection = connectionPool->AcquireConnection(false);
if (connection == nullptr) {
return E_CON_OVER_LIMIT;
}
if (connection->IsInTransaction()) {
connectionPool->ReleaseTransaction();
auto executeSqlArgs = GenerateSql(table, initialBatchValues, connection->GetMaxVariableNumber());
if (executeSqlArgs.empty()) {
connectionPool->ReleaseConnection(connection);
LOG_ERROR("Transaction is in excuting.");
return E_TRANSACTION_IN_EXECUTE;
LOG_ERROR("empty, table=%{public}s, values:%{public}zu, max number:%{public}d.", table.c_str(),
initialBatchValues.size(), connection->GetMaxVariableNumber());
return E_INVALID_ARGS;
}
BaseTransaction transaction(0);
connection->SetInTransaction(true);
errCode = connection->ExecuteSql(transaction.GetTransactionStr());
if (errCode != E_OK) {
LOG_ERROR("BeginTransaction with error code %{public}d.", errCode);
connection->SetInTransaction(false);
connectionPool->ReleaseConnection(connection);
connectionPool->ReleaseTransaction();
return errCode;
#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
if (delayNotifier_ != nullptr) {
delayNotifier_->SetAutoSyncInterval(AUTO_SYNC_MAX_INTERVAL);
}
// batch insert the values
for (auto iter = vecVectorObj.begin(); iter != vecVectorObj.end(); ++iter) {
outInsertNum++;
errCode = connection->ExecuteSql(iter->first, iter->second);
if (errCode != E_OK) {
LOG_ERROR("BatchInsert with error code %{public}d.", errCode);
outInsertNum = -1;
return FreeTransaction(connection, transaction.GetRollbackStr());
#endif
for (const auto &[sql, bindArgs] : executeSqlArgs) {
for (const auto &args : bindArgs) {
auto errCode = connection->ExecuteSql(sql, args);
if (errCode != E_OK) {
outInsertNum = -1;
connectionPool->ReleaseConnection(connection);
LOG_ERROR("BatchInsert failed, errCode : %{public}d, bindArgs : %{public}zu,"
"table : %{public}s, sql : %{public}s", errCode, bindArgs.size(), table.c_str(), sql.c_str());
return E_OK;
}
}
}
auto status = FreeTransaction(connection, transaction.GetCommitStr());
if (status == E_OK) {
DoCloudSync(table);
}
return status;
outInsertNum = initialBatchValues.size();
connectionPool->ReleaseConnection(connection);
DoCloudSync(table);
return E_OK;
}
std::pair<std::string, std::vector<ValueObject>> RdbStoreImpl::GetInsertParams(
std::map<std::string, ValueObject> &valuesMap, const std::string &table)
RdbStoreImpl::ExecuteSqls RdbStoreImpl::GenerateSql(
const std::string &table, const std::vector<ValuesBucket> &initialBatchValues, int limitVariableNumber)
{
std::string sql;
std::vector<ValueObject> bindArgs;
sql.append("INSERT INTO ").append(table).append("(");
size_t bindArgsSize = valuesMap.size();
if (bindArgsSize == 0) {
sql.append(") VALUES ()");
return std::make_pair(sql, bindArgs);
std::vector<std::vector<ValueObject>> values;
std::map<std::string, uint32_t> fields;
int32_t valuePosition = 0;
for (size_t row = 0; row < initialBatchValues.size(); row++) {
auto &vBucket = initialBatchValues[row];
if (values.max_size() == 0) {
values.reserve(vBucket.values_.size() * EXPANSION);
}
for (auto &[key, value] : vBucket.values_) {
if (value.GetType() == ValueObject::TYPE_ASSET ||
value.GetType() == ValueObject::TYPE_ASSETS) {
SetAssetStatus(value, AssetValue::STATUS_INSERT);
}
int32_t col = 0;
auto it = fields.find(key);
if (it == fields.end()) {
values.emplace_back(std::vector<ValueObject>(initialBatchValues.size()));
col = valuePosition;
fields.insert(std::pair{key, col});
valuePosition++;
} else {
col = it->second;
}
values[col][row] = value;
}
}
bindArgs.reserve(bindArgsSize);
auto valueIter = valuesMap.begin();
sql.append(valueIter->first);
if (valueIter->second.GetType() == ValueObject::TYPE_ASSET ||
valueIter->second.GetType() == ValueObject::TYPE_ASSETS) {
SetAssetStatus(valueIter->second, AssetValue::STATUS_INSERT);
}
bindArgs.push_back(valueIter->second);
++valueIter;
// prepare batch values & sql.columnName
for (; valueIter != valuesMap.end(); ++valueIter) {
sql.append(",").append(valueIter->first);
if (valueIter->second.GetType() == ValueObject::TYPE_ASSET ||
valueIter->second.GetType() == ValueObject::TYPE_ASSETS) {
SetAssetStatus(valueIter->second, AssetValue::STATUS_INSERT);
std::string sql = "INSERT OR REPLACE INTO " + table + " (";
std::vector<ValueObject> args(initialBatchValues.size() * values.size());
int32_t col = 0;
for (auto &[key, pos] : fields) {
for (size_t row = 0; row < initialBatchValues.size(); ++row) {
args[col + row * fields.size()] = std::move(values[pos][row]);
}
bindArgs.push_back(valueIter->second);
col++;
sql.append(key).append(",");
}
sql.append(") VALUES (").append(GetSqlArgs(bindArgsSize)).append(")");
// prepare sql.value
// put sql & vec<value> into map<sql, args>
return std::make_pair(sql, bindArgs);
sql.pop_back();
sql.append(") VALUES ");
return MakeExecuteSqls(sql, args, fields.size(), limitVariableNumber);
}
RdbStoreImpl::ExecuteSqls RdbStoreImpl::MakeExecuteSqls(
const std::string &sql, const std::vector<ValueObject> &args, int fieldSize, int limitVariableNumber)
{
if (fieldSize == 0) {
return ExecuteSqls();
}
size_t rowNumbers = args.size() / fieldSize;
size_t maxRowNumbersOneTimes = limitVariableNumber / fieldSize;
size_t executeTimes = rowNumbers / maxRowNumbersOneTimes;
size_t remainingRows = rowNumbers % maxRowNumbersOneTimes;
LOG_DEBUG("rowNumbers %{public}zu, maxRowNumbersOneTimes %{public}zu, executeTimes %{public}zu,"
"remainingRows %{public}zu, fieldSize %{public}d, limitVariableNumber %{public}d",
rowNumbers, maxRowNumbersOneTimes, executeTimes, remainingRows, fieldSize, limitVariableNumber);
std::string singleRowSqlArgs = "(" + GetSqlArgs(fieldSize) + ")";
auto appendAgsSql = [&singleRowSqlArgs, &sql] (size_t rowNumber) {
std::string sqlStr = sql;
for (size_t i = 0; i < rowNumber; ++i) {
sqlStr.append(singleRowSqlArgs).append(",");
}
sqlStr.pop_back();
return sqlStr;
};
std::string executeSql;
ExecuteSqls executeSqls;
auto start = args.begin();
if (executeTimes != 0) {
executeSql = appendAgsSql(maxRowNumbersOneTimes);
std::vector<std::vector<ValueObject>> sqlArgs;
size_t maxVariableNumbers = maxRowNumbersOneTimes * fieldSize;
for (size_t i = 0; i < executeTimes; ++i) {
std::vector<ValueObject> bindValueArgs(start, start + maxVariableNumbers);
sqlArgs.emplace_back(std::move(bindValueArgs));
start += maxVariableNumbers;
}
executeSqls.emplace_back(std::make_pair(executeSql, std::move(sqlArgs)));
}
if (remainingRows != 0) {
executeSql = appendAgsSql(remainingRows);
std::vector<std::vector<ValueObject>> sqlArgs(1, std::vector<ValueObject>(start, args.end()));
executeSqls.emplace_back(std::make_pair(executeSql, std::move(sqlArgs)));
}
return executeSqls;
}
int RdbStoreImpl::Replace(int64_t &outRowId, const std::string &table, const ValuesBucket &initialValues)

View File

@ -15,11 +15,13 @@
#include "sqlite_connection.h"
#include <sqlite3sym.h>
#include <sys/stat.h>
#include <cerrno>
#include <memory>
#include <new>
#include <sqlite3sym.h>
#include <sys/stat.h>
#ifdef RDB_SUPPORT_ICU
#include <unicode/ucol.h>
#endif
@ -28,12 +30,11 @@
#include "file_ex.h"
#include "logger.h"
#include "raw_data_parser.h"
#include "rdb_errno.h"
#include "sqlite_errno.h"
#include "sqlite_global_config.h"
#include "sqlite_utils.h"
#include "raw_data_parser.h"
#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
#include "directory_ex.h"
#include "rdb_security_manager.h"
@ -135,8 +136,9 @@ int SqliteConnection::InnerOpen(const RdbStoreConfig &config, uint32_t retry)
#endif
return SQLiteError::ErrNo(errCode);
}
maxVariableNumber_ = sqlite3_limit(dbHandle, SQLITE_LIMIT_VARIABLE_NUMBER, -1);
errCode = Configure(config, retry, dbPath);
isConfigured_ = true;
if (errCode != E_OK) {
return errCode;
}
@ -594,6 +596,7 @@ int SqliteConnection::PrepareAndBind(const std::string &sql, const std::vector<V
LOG_ERROR("SqliteConnection dbHandle is nullptr");
return E_INVALID_STATEMENT;
}
int errCode = LimitWalSize();
if (errCode != E_OK) {
return errCode;
@ -930,30 +933,34 @@ bool SqliteConnection::IsInTransaction()
return inTransaction_;
}
int SqliteConnection::TryCheckPoint()
{
std::string walName = sqlite3_filename_wal(sqlite3_db_filename(dbHandle, "main"));
int fileSize = SqliteUtils::GetFileSize(walName);
if (fileSize <= GlobalExpr::DB_WAL_SIZE_LIMIT_MIN) {
return E_OK;
}
int errCode = sqlite3_wal_checkpoint_v2(dbHandle, nullptr, SQLITE_CHECKPOINT_TRUNCATE, nullptr, nullptr);
if (errCode != SQLITE_OK) {
LOG_WARN("sqlite3_wal_checkpoint_v2 failed err %{public}d.", errCode);
return E_ERROR;
}
return E_OK;
}
int SqliteConnection::LimitWalSize()
{
if (!isWriteConnection) {
if (!isConfigured_ || !isWriteConnection) {
return E_OK;
}
std::string walName = sqlite3_filename_wal(sqlite3_db_filename(dbHandle, "main"));
if (SqliteUtils::GetFileSize(walName) <= GlobalExpr::DB_WAL_SIZE_LIMIT) {
return E_OK;
}
int errCode = sqlite3_wal_checkpoint_v2(dbHandle, nullptr, SQLITE_CHECKPOINT_TRUNCATE, nullptr, nullptr);
if (errCode != SQLITE_OK) {
LOG_WARN("sqlite3_wal_checkpoint_v2 failed %{public}d.", errCode);
return E_WAL_SIZE_OVER_LIMIT;
}
int fileSize = SqliteUtils::GetFileSize(walName);
if (fileSize > GlobalExpr::DB_WAL_SIZE_LIMIT) {
if (fileSize > GlobalExpr::DB_WAL_SIZE_LIMIT_MAX) {
LOG_ERROR("the WAL file size over default limit, %{public}s size is %{public}d",
SqliteUtils::Anonymous(walName).c_str(), fileSize);
return E_WAL_SIZE_OVER_LIMIT;
}
return E_OK;
}
@ -1040,5 +1047,10 @@ void SqliteConnection::MergeAsset(ValueObject::Asset &oldAsset, ValueObject::Ass
return;
}
}
int SqliteConnection::GetMaxVariableNumber()
{
return maxVariableNumber_;
}
} // namespace NativeRdb
} // namespace OHOS

View File

@ -129,6 +129,7 @@ void SqliteConnectionPool::ReleaseConnection(std::shared_ptr<SqliteConnection> c
connection->DesFinalize();
if (connection == writeConnection_) {
ReleaseWriteConnection();
connection->TryCheckPoint();
} else {
ReleaseReadConnection(connection);
}
@ -226,7 +227,6 @@ int SqliteConnectionPool::InnerReOpenReadConnections()
return errCode;
}
int SqliteConnectionPool::ReOpenAvailableReadConnections()
{
std::unique_lock<std::mutex> lock(readMutex_);

View File

@ -103,7 +103,6 @@ if (is_ohos && !build_ohos_sdk) {
deps += [ "//third_party/sqlite:sqlite" ]
ldflags = [ "-Wl,--exclude-libs,ALL" ]
cflags_cc = [ "-fvisibility=hidden" ]
sources += [
"${relational_store_native_path}/rdb/src/abs_shared_result_set.cpp",
"${relational_store_native_path}/rdb/src/delay_notify.cpp",

View File

@ -175,7 +175,6 @@ HWTEST_F(RdbTransactionTest, RdbStore_Transaction_002, TestSize.Level1)
EXPECT_EQ(deletedRows, 3);
}
/**
* @tc.name: RdbStore_Transaction_003
* @tc.desc: test RdbStore BaseTransaction
@ -536,7 +535,7 @@ HWTEST_F(RdbTransactionTest, RdbStore_BatchInsert_003, TestSize.Level1)
number = INT_MIN;
error = store->BatchInsert(number, "test", valuesBuckets);
EXPECT_EQ(E_OK, error);
EXPECT_EQ(-1, number);
EXPECT_EQ(50, number);
resultSet = store->QuerySql("SELECT * FROM test");
resultSet->GetRowCount(rowCount);
@ -551,37 +550,4 @@ HWTEST_F(RdbTransactionTest, RdbStore_BatchInsert_003, TestSize.Level1)
}
resultSet->Close();
EXPECT_EQ(100, number);
}
/**
* @tc.name: RdbStore_BatchInsert_004
* @tc.desc: Abnormal testCase of transaction for batchInsert, if batchInsert in transaction
* @tc.type: FUNC
*/
HWTEST_F(RdbTransactionTest, RdbStore_BatchInsert_004, TestSize.Level1)
{
std::shared_ptr<RdbStore> &store = RdbTransactionTest::store;
store->ExecuteSql("delete from test");
int id = 0;
ValuesBucket values;
std::vector<ValuesBucket> valuesBuckets;
for (int i = 0; i < 10; i++) {
values.Clear();
values.PutInt("id", id + i);
values.PutString("name", "zhangsan");
valuesBuckets.push_back(values);
}
int error = store->BeginTransaction();
EXPECT_EQ(E_OK, error);
int64_t number = 0;
error = store->BatchInsert(number, "test", valuesBuckets);
EXPECT_EQ(E_TRANSACTION_IN_EXECUTE, error);
EXPECT_EQ(0, number);
error = store->Commit();
EXPECT_EQ(E_OK, error);
}

View File

@ -39,7 +39,7 @@ public:
static void MakeWalNoReachLimit();
static void MakeWalIncrease();
static void KeepReadConnection();
static ValuesBucket MakeValueBucket();
static ValuesBucket MakeValueBucket(const int &id);
static const std::string DATABASE_NAME;
static std::shared_ptr<RdbStore> store;
@ -52,7 +52,6 @@ std::shared_ptr<ResultSet> RdbWalLimitTest::resultSet = nullptr;
// create 1M data
std::vector<uint8_t> blobValue = RdbWalLimitTest::CreateRandomData(1 * 1024 * 1024);
ValuesBucket values = RdbWalLimitTest::MakeValueBucket();
class RdbWalLimitCallback : public RdbOpenCallback {
public:
@ -184,10 +183,10 @@ void RdbWalLimitTest::MakeWalNoReachLimit()
}
}
ValuesBucket RdbWalLimitTest::MakeValueBucket()
ValuesBucket RdbWalLimitTest::MakeValueBucket(const int &id)
{
ValuesBucket values;
values.PutInt("id", 200);
values.PutInt("id", id);
values.PutString("name", std::string("lisi"));
values.PutInt("age", 18);
values.PutDouble("salary", 200.8);
@ -197,7 +196,8 @@ ValuesBucket RdbWalLimitTest::MakeValueBucket()
/**
* @tc.name: RdbStore_WalOverLimit_001
* @tc.desc: Without reading data, the WAL size will not over default limit if write data continuously.
* @tc.desc: Without reading data or conducting transactions, if data is continuously written,
* the WAL size will not exceed the default limit.
* @tc.type: FUNC
* @tc.acquire: AR000HR0G5
*/
@ -206,19 +206,14 @@ HWTEST_F(RdbWalLimitTest, RdbStore_WalOverLimit_001, TestSize.Level1)
MakeWalIncrease();
int64_t id;
ValuesBucket values = MakeValueBucket(199);
values.PutBlob("blobType", blobValue);
EXPECT_EQ(store->Insert(id, "test", values), E_OK);
int changedRows;
EXPECT_EQ(store->Update(changedRows, "test", values, "id = ?", std::vector<std::string>{ "200" }), E_OK);
EXPECT_EQ(store->Replace(id, "test", values), E_OK);
int deletedRows;
EXPECT_EQ(store->Delete(deletedRows, "test", "id = 200"), E_OK);
std::vector<ValuesBucket> valuesBuckets;
for (int i = 0; i < 10; i++) {
valuesBuckets.push_back(values);
for (int i = 200; i < 210; i++) {
valuesBuckets.push_back(RdbWalLimitTest::MakeValueBucket(i));
}
int64_t insertNum = 0;
@ -229,7 +224,7 @@ HWTEST_F(RdbWalLimitTest, RdbStore_WalOverLimit_001, TestSize.Level1)
/**
* @tc.name: RdbStore_WalOverLimit_002
* @tc.desc: While reading data and writing data continuously if the WAL size not over default limit.
* @tc.desc: Before the wal file exceeds the limit, both read and write can be executed normally.
* @tc.type: FUNC
* @tc.acquire: AR000HR0G5
*/
@ -237,19 +232,11 @@ HWTEST_F(RdbWalLimitTest, RdbStore_WalOverLimit_002, TestSize.Level1)
{
KeepReadConnection();
MakeWalNoReachLimit();
int changedRows;
EXPECT_EQ(store->Update(changedRows, "test", values, "id = ?", std::vector<std::string>{ "21" }), E_OK);
int64_t id;
EXPECT_EQ(store->Replace(id, "test", values), E_OK);
int deletedRows;
EXPECT_EQ(store->Delete(deletedRows, "test", "id = 21"), E_OK);
ValuesBucket values = MakeValueBucket(20);
std::vector<ValuesBucket> valuesBuckets;
for (int i = 0; i < 10; i++) {
valuesBuckets.push_back(values);
for (int i = 21; i < 30; i++) {
valuesBuckets.push_back(MakeValueBucket(i));
}
int64_t insertNum = 0;
@ -260,116 +247,16 @@ HWTEST_F(RdbWalLimitTest, RdbStore_WalOverLimit_002, TestSize.Level1)
/**
* @tc.name: RdbStore_WalOverLimit_003
* @tc.desc: While reading data, can not write data continuously if the WAL size over default limit.
* @tc.desc: During transactions, the size of the wal file may exceed the limit.
* @tc.type: FUNC
* @tc.acquire: AR000HR0G5
*/
HWTEST_F(RdbWalLimitTest, RdbStore_WalOverLimit_003, TestSize.Level3)
{
KeepReadConnection();
ValuesBucket values = MakeValueBucket(200);
int64_t id;
store->BeginTransaction();
MakeWalReachLimit();
int64_t id;
EXPECT_EQ(store->Insert(id, "test", values), E_WAL_SIZE_OVER_LIMIT);
EXPECT_EQ(store->BeginTransaction(), E_WAL_SIZE_OVER_LIMIT);
int changedRows;
EXPECT_EQ(store->Update(changedRows, "test", values, "id = ?", std::vector<std::string>{ "200" }),
E_WAL_SIZE_OVER_LIMIT);
EXPECT_EQ(store->Replace(id, "test", values), E_WAL_SIZE_OVER_LIMIT);
int deletedRows;
EXPECT_EQ(store->Delete(deletedRows, "test", "id = 200"), E_WAL_SIZE_OVER_LIMIT);
std::vector<ValuesBucket> valuesBuckets;
for (int i = 0; i < 2; i++) {
valuesBuckets.push_back(values);
}
int64_t insertNum = 0;
EXPECT_EQ(store->BatchInsert(insertNum, "test", valuesBuckets), E_WAL_SIZE_OVER_LIMIT);
EXPECT_EQ(store->ExecuteSql("DELETE FROM test"), E_WAL_SIZE_OVER_LIMIT);
int64_t outLong;
EXPECT_EQ(store->ExecuteAndGetLong(outLong, "DELETE FROM test"), E_WAL_SIZE_OVER_LIMIT);
}
/**
* @tc.name: RdbStore_WalOverLimit_004
* @tc.desc: Writing data after closing read connection.
* @tc.type: FUNC
* @tc.acquire: AR000HR0G5
*/
HWTEST_F(RdbWalLimitTest, RdbStore_WalOverLimit_004, TestSize.Level3)
{
KeepReadConnection();
MakeWalReachLimit();
int64_t id;
EXPECT_EQ(store->Insert(id, "test", values), E_WAL_SIZE_OVER_LIMIT);
EXPECT_EQ(resultSet->Close(), E_OK);
{
store->BeginTransaction();
EXPECT_EQ(store->Insert(id, "test", values), E_OK);
store->Commit();
}
int changedRows;
EXPECT_EQ(store->Update(changedRows, "test", values, "id = ?", std::vector<std::string>{ "200" }), E_OK);
EXPECT_EQ(store->Replace(id, "test", values), E_OK);
int deletedRows;
EXPECT_EQ(store->Delete(deletedRows, "test", "id = 200"), E_OK);
std::vector<ValuesBucket> valuesBuckets;
for (int i = 0; i < 2; i++) {
valuesBuckets.push_back(values);
}
int64_t insertNum = 0;
EXPECT_EQ(store->BatchInsert(insertNum, "test", valuesBuckets), E_OK);
EXPECT_EQ(store->ExecuteSql("DELETE FROM test"), E_OK);
}
/**
* @tc.name: RdbStore_WalOverLimit_005
* @tc.desc: While reading data and transaction will fail if the WAL file size over default size.
* @tc.type: FUNC
* @tc.acquire: AR000HR0G5
*/
HWTEST_F(RdbWalLimitTest, RdbStore_WalOverLimit_005, TestSize.Level3)
{
KeepReadConnection();
int64_t id;
{
store->BeginTransaction();
MakeWalReachLimit();
EXPECT_EQ(store->Insert(id, "test", values), E_WAL_SIZE_OVER_LIMIT);
store->Commit();
}
int changedRows;
EXPECT_EQ(store->Update(changedRows, "test", values, "id = ?", std::vector<std::string>{ "200" }),
E_WAL_SIZE_OVER_LIMIT);
EXPECT_EQ(store->Replace(id, "test", values), E_WAL_SIZE_OVER_LIMIT);
int deletedRows;
EXPECT_EQ(store->Delete(deletedRows, "test", "id = 200"),
E_WAL_SIZE_OVER_LIMIT);
std::vector<ValuesBucket> valuesBuckets;
for (int i = 0; i < 2; i++) {
valuesBuckets.push_back(values);
}
int64_t insertNum = 0;
EXPECT_EQ(store->BatchInsert(insertNum, "test", valuesBuckets), E_WAL_SIZE_OVER_LIMIT);
EXPECT_EQ(store->ExecuteSql("DELETE FROM test"), E_WAL_SIZE_OVER_LIMIT);
}
store->Commit();
}