!1030 Increase the range of LimltWalSize and modify the checkpoint to asynchronous

Merge pull request !1030 from 常佳兴/master
This commit is contained in:
openharmony_ci 2024-01-18 09:56:59 +00:00 committed by Gitee
commit e2f358b8e0
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
15 changed files with 203 additions and 272 deletions

View File

@ -29,7 +29,7 @@ public:
void SetExecutorPool(std::shared_ptr<ExecutorPool> pool); void SetExecutorPool(std::shared_ptr<ExecutorPool> pool);
void SetTask(Task task); void SetTask(Task task);
void UpdateNotify(const DistributedRdb::RdbChangedData &changedData); void UpdateNotify(const DistributedRdb::RdbChangedData &changedData);
void SetAutoSyncInterval(uint32_t autoSyncInterval);
private: private:
static constexpr uint32_t FORCE_SYNC_INTERVAL = 100; static constexpr uint32_t FORCE_SYNC_INTERVAL = 100;
static constexpr uint32_t AUTO_SYNC_INTERVAL = 50; static constexpr uint32_t AUTO_SYNC_INTERVAL = 50;
@ -43,6 +43,8 @@ private:
void StartTimer(); void StartTimer();
void StopTimer(); void StopTimer();
void ExecuteTask(); void ExecuteTask();
void RestoreDefaultSyncInterval();
uint32_t autoSyncInterval_ = AUTO_SYNC_INTERVAL;
}; };
} }
#endif // NATIVE_RDB_DELAY_NOTIFY_H #endif // NATIVE_RDB_DELAY_NOTIFY_H

View File

@ -178,12 +178,15 @@ public:
int CleanDirtyData(const std::string &table, uint64_t cursor = UINT64_MAX) override; int CleanDirtyData(const std::string &table, uint64_t cursor = UINT64_MAX) override;
private: private:
using ExecuteSqls = std::vector<std::pair<std::string, std::vector<std::vector<ValueObject>>>>;
int InnerOpen(); int InnerOpen();
int CheckAttach(const std::string &sql); int CheckAttach(const std::string &sql);
int BeginExecuteSql(const std::string &sql, std::shared_ptr<SqliteConnection> &connection); int BeginExecuteSql(const std::string &sql, std::shared_ptr<SqliteConnection> &connection);
int FreeTransaction(std::shared_ptr<SqliteConnection> connection, const std::string &sql); int FreeTransaction(std::shared_ptr<SqliteConnection> connection, const std::string &sql);
std::pair<std::string, std::vector<ValueObject>> GetInsertParams( ExecuteSqls GenerateSql(
std::map<std::string, ValueObject> &valuesMap, const std::string &table); const std::string &table, const std::vector<ValuesBucket> &initialBatchValues, int limitVariableNumber);
ExecuteSqls MakeExecuteSqls(
const std::string &sql, const std::vector<ValueObject> &args, int fieldSize, int limitVariableNumber);
int GetDataBasePath(const std::string &databasePath, std::string &backupFilePath); int GetDataBasePath(const std::string &databasePath, std::string &backupFilePath);
int ExecuteSqlInner(const std::string &sql, const std::vector<ValueObject> &bindArgs); int ExecuteSqlInner(const std::string &sql, const std::vector<ValueObject> &bindArgs);
int ExecuteGetLongInner(const std::string &sql, const std::vector<ValueObject> &bindArgs); int ExecuteGetLongInner(const std::string &sql, const std::vector<ValueObject> &bindArgs);
@ -232,6 +235,8 @@ private:
static constexpr char SCHEME_RDB[] = "rdb://"; static constexpr char SCHEME_RDB[] = "rdb://";
std::map<std::string, std::list<std::shared_ptr<RdbStoreLocalObserver>>> localObservers_; std::map<std::string, std::list<std::shared_ptr<RdbStoreLocalObserver>>> localObservers_;
std::map<std::string, std::list<sptr<RdbStoreLocalSharedObserver>>> localSharedObservers_; std::map<std::string, std::list<sptr<RdbStoreLocalSharedObserver>>> localSharedObservers_;
static constexpr uint32_t EXPANSION = 2;
static constexpr uint32_t AUTO_SYNC_MAX_INTERVAL = 20000;
}; };
} // namespace OHOS::NativeRdb } // namespace OHOS::NativeRdb
#endif #endif

View File

@ -57,6 +57,7 @@ public:
int EndStepQuery(); int EndStepQuery();
void SetInTransaction(bool transaction); void SetInTransaction(bool transaction);
bool IsInTransaction(); bool IsInTransaction();
int TryCheckPoint();
int LimitWalSize(); int LimitWalSize();
#ifdef RDB_SUPPORT_ICU #ifdef RDB_SUPPORT_ICU
int ConfigLocale(const std::string localeStr); int ConfigLocale(const std::string localeStr);
@ -65,7 +66,7 @@ public:
AppDataFwk::SharedBlock *sharedBlock, int startPos, int requiredPos, bool isCountAllRows); AppDataFwk::SharedBlock *sharedBlock, int startPos, int requiredPos, bool isCountAllRows);
int CleanDirtyData(const std::string &table, uint64_t cursor); int CleanDirtyData(const std::string &table, uint64_t cursor);
int RegisterCallBackObserver(const DataChangeCallback &clientChangedData); int RegisterCallBackObserver(const DataChangeCallback &clientChangedData);
int GetMaxVariableNumber();
private: private:
static constexpr const char *MERGE_ASSETS_FUNC = "merge_assets"; static constexpr const char *MERGE_ASSETS_FUNC = "merge_assets";
explicit SqliteConnection(bool isWriteConnection); explicit SqliteConnection(bool isWriteConnection);
@ -111,8 +112,11 @@ private:
static constexpr uint32_t ITER_V1 = 5000; static constexpr uint32_t ITER_V1 = 5000;
static constexpr uint32_t ITERS[] = {NO_ITER, ITER_V1}; static constexpr uint32_t ITERS[] = {NO_ITER, ITER_V1};
static constexpr uint32_t ITERS_COUNT = sizeof(ITERS) / sizeof(ITERS[0]); static constexpr uint32_t ITERS_COUNT = sizeof(ITERS) / sizeof(ITERS[0]);
bool isConfigured_ = false;
int maxVariableNumber_;
}; };
} // namespace NativeRdb } // namespace NativeRdb
} // namespace OHOS } // namespace OHOS
#endif #endif

View File

@ -28,7 +28,6 @@
#include "rdb_store_config.h" #include "rdb_store_config.h"
#include "sqlite_connection.h" #include "sqlite_connection.h"
#include "base_transaction.h" #include "base_transaction.h"
namespace OHOS { namespace OHOS {
namespace NativeRdb { namespace NativeRdb {
class SqliteConnectionPool { class SqliteConnectionPool {

View File

@ -28,7 +28,8 @@ public:
static constexpr int SOFT_HEAP_LIMIT = 8 * 1024 * 1024; /* 8MB */ static constexpr int SOFT_HEAP_LIMIT = 8 * 1024 * 1024; /* 8MB */
static constexpr int DB_PAGE_SIZE = 4096; /* default page size : 4k */ static constexpr int DB_PAGE_SIZE = 4096; /* default page size : 4k */
static constexpr int DB_JOURNAL_SIZE = 1024 * 1024; /* default file size : 1M */ static constexpr int DB_JOURNAL_SIZE = 1024 * 1024; /* default file size : 1M */
static constexpr int DB_WAL_SIZE_LIMIT = 200 * 1024 * 1024; /* default wal file maximum size : 200M */ static constexpr int DB_WAL_SIZE_LIMIT_MIN = 20 * 1024 * 1024; /* default wal file maximum size : 20M */
static constexpr int DB_WAL_SIZE_LIMIT_MAX = 200 * 1024 * 1024; /* default wal file maximum size : 200M */
static constexpr int WAL_AUTO_CHECKPOINT = 100; /* 100 pages */ static constexpr int WAL_AUTO_CHECKPOINT = 100; /* 100 pages */
static constexpr int APP_DEFAULT_UMASK = 0002; static constexpr int APP_DEFAULT_UMASK = 0002;
static constexpr int SQLITE_MAX_COLUMN = 2000; static constexpr int SQLITE_MAX_COLUMN = 2000;

View File

@ -97,14 +97,17 @@ public:
int Delete(int &deletedRows, const AbsRdbPredicates &predicates) override; int Delete(int &deletedRows, const AbsRdbPredicates &predicates) override;
private: private:
using ExecuteSqls = std::vector<std::pair<std::string, std::vector<std::vector<ValueObject>>>>;
int InnerOpen(); int InnerOpen();
int CheckAttach(const std::string &sql); int CheckAttach(const std::string &sql);
bool PathToRealPath(const std::string &path, std::string &realPath); bool PathToRealPath(const std::string &path, std::string &realPath);
std::string ExtractFilePath(const std::string &fileFullName); std::string ExtractFilePath(const std::string &fileFullName);
int BeginExecuteSql(const std::string &sql, std::shared_ptr<SqliteConnection> &connection); int BeginExecuteSql(const std::string &sql, std::shared_ptr<SqliteConnection> &connection);
int FreeTransaction(std::shared_ptr<SqliteConnection> connection, const std::string &sql); int FreeTransaction(std::shared_ptr<SqliteConnection> connection, const std::string &sql);
std::pair<std::string, std::vector<ValueObject>> GetInsertParams( ExecuteSqls GenerateSql(
std::map<std::string, ValueObject> &valuesMap, const std::string &table); const std::string &table, const std::vector<ValuesBucket> &initialBatchValues, int limitVariableNumber);
ExecuteSqls MakeExecuteSqls(
const std::string &sql, const std::vector<ValueObject> &args, int fieldSize, int limitVariableNumber);
int GetDataBasePath(const std::string &databasePath, std::string &backupFilePath); int GetDataBasePath(const std::string &databasePath, std::string &backupFilePath);
int ExecuteSqlInner(const std::string &sql, const std::vector<ValueObject> &bindArgs); int ExecuteSqlInner(const std::string &sql, const std::vector<ValueObject> &bindArgs);
int ExecuteGetLongInner(const std::string &sql, const std::vector<ValueObject> &bindArgs); int ExecuteGetLongInner(const std::string &sql, const std::vector<ValueObject> &bindArgs);
@ -125,6 +128,8 @@ private:
std::string name; std::string name;
std::string fileType; std::string fileType;
bool isEncrypt_; bool isEncrypt_;
static constexpr uint32_t EXPANSION = 2;
static constexpr uint32_t AUTO_SYNC_MAX_INTERVAL = 20000;
}; };
} // namespace OHOS::NativeRdb } // namespace OHOS::NativeRdb
#endif #endif

View File

@ -55,13 +55,14 @@ public:
int EndStepQuery(); int EndStepQuery();
void SetInTransaction(bool transaction); void SetInTransaction(bool transaction);
bool IsInTransaction(); bool IsInTransaction();
int TryCheckPoint();
int LimitWalSize(); int LimitWalSize();
#ifdef RDB_SUPPORT_ICU #ifdef RDB_SUPPORT_ICU
int ConfigLocale(const std::string localeStr); int ConfigLocale(const std::string localeStr);
#endif #endif
int RegisterCallBackObserver(const DataChangeCallback &clientChangedData); int RegisterCallBackObserver(const DataChangeCallback &clientChangedData);
int GetMaxVariableNumber();
private: private:
static constexpr const char *MERGE_ASSETS_FUNC = "merge_assets"; static constexpr const char *MERGE_ASSETS_FUNC = "merge_assets";
explicit SqliteConnection(bool isWriteConnection); explicit SqliteConnection(bool isWriteConnection);
@ -105,6 +106,9 @@ private:
static constexpr uint32_t ITER_V1 = 5000; static constexpr uint32_t ITER_V1 = 5000;
static constexpr uint32_t ITERS[] = {NO_ITER, ITER_V1}; static constexpr uint32_t ITERS[] = {NO_ITER, ITER_V1};
static constexpr uint32_t ITERS_COUNT = sizeof(ITERS) / sizeof(ITERS[0]); static constexpr uint32_t ITERS_COUNT = sizeof(ITERS) / sizeof(ITERS[0]);
bool isConfigured_ = false;
int maxVariableNumber_;
}; };
} // namespace NativeRdb } // namespace NativeRdb
} // namespace OHOS } // namespace OHOS

View File

@ -27,7 +27,6 @@
#include "rdb_store_config.h" #include "rdb_store_config.h"
#include "sqlite_connection.h" #include "sqlite_connection.h"
#include "base_transaction.h" #include "base_transaction.h"
namespace OHOS { namespace OHOS {
namespace NativeRdb { namespace NativeRdb {
class SqliteConnectionPool { class SqliteConnectionPool {

View File

@ -68,16 +68,16 @@ void DelayNotify::StartTimer()
if (pool_ == nullptr) { if (pool_ == nullptr) {
return; return;
} }
if (forceSyncTaskId_ == Executor::INVALID_TASK_ID) { if (forceSyncTaskId_ == Executor::INVALID_TASK_ID && autoSyncInterval_ == AUTO_SYNC_INTERVAL) {
forceSyncTaskId_ = pool_->Schedule(std::chrono::milliseconds(FORCE_SYNC_INTERVAL), forceSyncTaskId_ = pool_->Schedule(std::chrono::milliseconds(FORCE_SYNC_INTERVAL),
[this]() { ExecuteTask(); }); [this]() { ExecuteTask(); });
} }
if (delaySyncTaskId_ == Executor::INVALID_TASK_ID) { if (delaySyncTaskId_ == Executor::INVALID_TASK_ID) {
delaySyncTaskId_ = pool_->Schedule(std::chrono::milliseconds(AUTO_SYNC_INTERVAL), delaySyncTaskId_ = pool_->Schedule(std::chrono::milliseconds(autoSyncInterval_),
[this]() { ExecuteTask(); }); [this]() { ExecuteTask(); });
} else { } else {
delaySyncTaskId_ = delaySyncTaskId_ =
pool_->Reset(delaySyncTaskId_, std::chrono::milliseconds(AUTO_SYNC_INTERVAL)); pool_->Reset(delaySyncTaskId_, std::chrono::milliseconds(autoSyncInterval_));
} }
} }
@ -95,6 +95,7 @@ void DelayNotify::ExecuteTask()
{ {
LOG_DEBUG("Notify data change."); LOG_DEBUG("Notify data change.");
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
RestoreDefaultSyncInterval();
StopTimer(); StopTimer();
if (task_ != nullptr && changedData_.tableData.size() > 0) { if (task_ != nullptr && changedData_.tableData.size() > 0) {
int errCode = task_(changedData_); int errCode = task_(changedData_);
@ -105,4 +106,15 @@ void DelayNotify::ExecuteTask()
} }
changedData_.tableData.clear(); changedData_.tableData.clear();
} }
void DelayNotify::SetAutoSyncInterval(uint32_t interval)
{
autoSyncInterval_ = interval;
}
void DelayNotify::RestoreDefaultSyncInterval()
{
autoSyncInterval_ = AUTO_SYNC_INTERVAL;
}
} }

View File

@ -317,92 +317,128 @@ int RdbStoreImpl::BatchInsert(int64_t &outInsertNum, const std::string &table,
outInsertNum = 0; outInsertNum = 0;
return E_OK; return E_OK;
} }
// prepare batch data & sql
std::vector<std::pair<std::string, std::vector<ValueObject>>> vecVectorObj;
for (auto iter = initialBatchValues.begin(); iter != initialBatchValues.end(); ++iter) {
auto values = (*iter).GetAll();
vecVectorObj.push_back(GetInsertParams(values, table));
}
// prepare BeginTransaction
int errCode = connectionPool->AcquireTransaction();
if (errCode != E_OK) {
return errCode;
}
auto connection = connectionPool->AcquireConnection(false); auto connection = connectionPool->AcquireConnection(false);
if (connection == nullptr) { if (connection == nullptr) {
return E_CON_OVER_LIMIT; return E_CON_OVER_LIMIT;
} }
auto executeSqlArgs = GenerateSql(table, initialBatchValues, connection->GetMaxVariableNumber());
if (connection->IsInTransaction()) { if (executeSqlArgs.empty()) {
connectionPool->ReleaseTransaction();
connectionPool->ReleaseConnection(connection); connectionPool->ReleaseConnection(connection);
LOG_ERROR("Transaction is in excuting."); LOG_ERROR("empty, table=%{public}s, values:%{public}zu, max number:%{public}d.", table.c_str(),
return E_TRANSACTION_IN_EXECUTE; initialBatchValues.size(), connection->GetMaxVariableNumber());
return E_INVALID_ARGS;
} }
BaseTransaction transaction(0); #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
connection->SetInTransaction(true); if (delayNotifier_ != nullptr) {
errCode = connection->ExecuteSql(transaction.GetTransactionStr()); delayNotifier_->SetAutoSyncInterval(AUTO_SYNC_MAX_INTERVAL);
if (errCode != E_OK) {
LOG_ERROR("BeginTransaction with error code %{public}d.", errCode);
connection->SetInTransaction(false);
connectionPool->ReleaseConnection(connection);
connectionPool->ReleaseTransaction();
return errCode;
} }
#endif
// batch insert the values for (const auto &[sql, bindArgs] : executeSqlArgs) {
for (auto iter = vecVectorObj.begin(); iter != vecVectorObj.end(); ++iter) { for (const auto &args : bindArgs) {
outInsertNum++; auto errCode = connection->ExecuteSql(sql, args);
errCode = connection->ExecuteSql(iter->first, iter->second); if (errCode != E_OK) {
if (errCode != E_OK) { outInsertNum = -1;
LOG_ERROR("BatchInsert with error code %{public}d.", errCode); connectionPool->ReleaseConnection(connection);
outInsertNum = -1; LOG_ERROR("BatchInsert failed, errCode : %{public}d, bindArgs : %{public}zu,"
return FreeTransaction(connection, transaction.GetRollbackStr()); "table : %{public}s, sql : %{public}s", errCode, bindArgs.size(), table.c_str(), sql.c_str());
return E_OK;
}
} }
} }
auto status = FreeTransaction(connection, transaction.GetCommitStr()); outInsertNum = initialBatchValues.size();
if (status == E_OK) { connectionPool->ReleaseConnection(connection);
DoCloudSync(table); DoCloudSync(table);
} return E_OK;
return status;
} }
std::pair<std::string, std::vector<ValueObject>> RdbStoreImpl::GetInsertParams( RdbStoreImpl::ExecuteSqls RdbStoreImpl::GenerateSql(
std::map<std::string, ValueObject> &valuesMap, const std::string &table) const std::string &table, const std::vector<ValuesBucket> &initialBatchValues, int limitVariableNumber)
{ {
std::string sql; std::vector<std::vector<ValueObject>> values;
std::vector<ValueObject> bindArgs; std::map<std::string, uint32_t> fields;
sql.append("INSERT INTO ").append(table).append("("); int32_t valuePosition = 0;
size_t bindArgsSize = valuesMap.size(); for (size_t row = 0; row < initialBatchValues.size(); row++) {
if (bindArgsSize == 0) { auto &vBucket = initialBatchValues[row];
sql.append(") VALUES ()"); if (values.max_size() == 0) {
return std::make_pair(sql, bindArgs); values.reserve(vBucket.values_.size() * EXPANSION);
}
for (auto &[key, value] : vBucket.values_) {
if (value.GetType() == ValueObject::TYPE_ASSET ||
value.GetType() == ValueObject::TYPE_ASSETS) {
SetAssetStatus(value, AssetValue::STATUS_INSERT);
}
int32_t col = 0;
auto it = fields.find(key);
if (it == fields.end()) {
values.emplace_back(std::vector<ValueObject>(initialBatchValues.size()));
col = valuePosition;
fields.insert(std::pair{key, col});
valuePosition++;
} else {
col = it->second;
}
values[col][row] = value;
}
} }
bindArgs.reserve(bindArgsSize); std::string sql = "INSERT OR REPLACE INTO " + table + " (";
auto valueIter = valuesMap.begin(); std::vector<ValueObject> args(initialBatchValues.size() * values.size());
sql.append(valueIter->first); int32_t col = 0;
if (valueIter->second.GetType() == ValueObject::TYPE_ASSET || for (auto &[key, pos] : fields) {
valueIter->second.GetType() == ValueObject::TYPE_ASSETS) { for (size_t row = 0; row < initialBatchValues.size(); ++row) {
SetAssetStatus(valueIter->second, AssetValue::STATUS_INSERT); args[col + row * fields.size()] = std::move(values[pos][row]);
}
bindArgs.push_back(valueIter->second);
++valueIter;
// prepare batch values & sql.columnName
for (; valueIter != valuesMap.end(); ++valueIter) {
sql.append(",").append(valueIter->first);
if (valueIter->second.GetType() == ValueObject::TYPE_ASSET ||
valueIter->second.GetType() == ValueObject::TYPE_ASSETS) {
SetAssetStatus(valueIter->second, AssetValue::STATUS_INSERT);
} }
bindArgs.push_back(valueIter->second); col++;
sql.append(key).append(",");
} }
sql.append(") VALUES (").append(GetSqlArgs(bindArgsSize)).append(")"); sql.pop_back();
// prepare sql.value sql.append(") VALUES ");
// put sql & vec<value> into map<sql, args> return MakeExecuteSqls(sql, args, fields.size(), limitVariableNumber);
return std::make_pair(sql, bindArgs); }
RdbStoreImpl::ExecuteSqls RdbStoreImpl::MakeExecuteSqls(
const std::string &sql, const std::vector<ValueObject> &args, int fieldSize, int limitVariableNumber)
{
if (fieldSize == 0) {
return ExecuteSqls();
}
size_t rowNumbers = args.size() / fieldSize;
size_t maxRowNumbersOneTimes = limitVariableNumber / fieldSize;
size_t executeTimes = rowNumbers / maxRowNumbersOneTimes;
size_t remainingRows = rowNumbers % maxRowNumbersOneTimes;
LOG_DEBUG("rowNumbers %{public}zu, maxRowNumbersOneTimes %{public}zu, executeTimes %{public}zu,"
"remainingRows %{public}zu, fieldSize %{public}d, limitVariableNumber %{public}d",
rowNumbers, maxRowNumbersOneTimes, executeTimes, remainingRows, fieldSize, limitVariableNumber);
std::string singleRowSqlArgs = "(" + GetSqlArgs(fieldSize) + ")";
auto appendAgsSql = [&singleRowSqlArgs, &sql] (size_t rowNumber) {
std::string sqlStr = sql;
for (size_t i = 0; i < rowNumber; ++i) {
sqlStr.append(singleRowSqlArgs).append(",");
}
sqlStr.pop_back();
return sqlStr;
};
std::string executeSql;
ExecuteSqls executeSqls;
auto start = args.begin();
if (executeTimes != 0) {
executeSql = appendAgsSql(maxRowNumbersOneTimes);
std::vector<std::vector<ValueObject>> sqlArgs;
size_t maxVariableNumbers = maxRowNumbersOneTimes * fieldSize;
for (size_t i = 0; i < executeTimes; ++i) {
std::vector<ValueObject> bindValueArgs(start, start + maxVariableNumbers);
sqlArgs.emplace_back(std::move(bindValueArgs));
start += maxVariableNumbers;
}
executeSqls.emplace_back(std::make_pair(executeSql, std::move(sqlArgs)));
}
if (remainingRows != 0) {
executeSql = appendAgsSql(remainingRows);
std::vector<std::vector<ValueObject>> sqlArgs(1, std::vector<ValueObject>(start, args.end()));
executeSqls.emplace_back(std::make_pair(executeSql, std::move(sqlArgs)));
}
return executeSqls;
} }
int RdbStoreImpl::Replace(int64_t &outRowId, const std::string &table, const ValuesBucket &initialValues) int RdbStoreImpl::Replace(int64_t &outRowId, const std::string &table, const ValuesBucket &initialValues)

View File

@ -15,11 +15,13 @@
#include "sqlite_connection.h" #include "sqlite_connection.h"
#include <sqlite3sym.h>
#include <sys/stat.h>
#include <cerrno> #include <cerrno>
#include <memory> #include <memory>
#include <new> #include <new>
#include <sqlite3sym.h>
#include <sys/stat.h>
#ifdef RDB_SUPPORT_ICU #ifdef RDB_SUPPORT_ICU
#include <unicode/ucol.h> #include <unicode/ucol.h>
#endif #endif
@ -28,12 +30,11 @@
#include "file_ex.h" #include "file_ex.h"
#include "logger.h" #include "logger.h"
#include "raw_data_parser.h"
#include "rdb_errno.h" #include "rdb_errno.h"
#include "sqlite_errno.h" #include "sqlite_errno.h"
#include "sqlite_global_config.h" #include "sqlite_global_config.h"
#include "sqlite_utils.h" #include "sqlite_utils.h"
#include "raw_data_parser.h"
#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM) #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
#include "directory_ex.h" #include "directory_ex.h"
#include "rdb_security_manager.h" #include "rdb_security_manager.h"
@ -135,8 +136,9 @@ int SqliteConnection::InnerOpen(const RdbStoreConfig &config, uint32_t retry)
#endif #endif
return SQLiteError::ErrNo(errCode); return SQLiteError::ErrNo(errCode);
} }
maxVariableNumber_ = sqlite3_limit(dbHandle, SQLITE_LIMIT_VARIABLE_NUMBER, -1);
errCode = Configure(config, retry, dbPath); errCode = Configure(config, retry, dbPath);
isConfigured_ = true;
if (errCode != E_OK) { if (errCode != E_OK) {
return errCode; return errCode;
} }
@ -594,6 +596,7 @@ int SqliteConnection::PrepareAndBind(const std::string &sql, const std::vector<V
LOG_ERROR("SqliteConnection dbHandle is nullptr"); LOG_ERROR("SqliteConnection dbHandle is nullptr");
return E_INVALID_STATEMENT; return E_INVALID_STATEMENT;
} }
int errCode = LimitWalSize(); int errCode = LimitWalSize();
if (errCode != E_OK) { if (errCode != E_OK) {
return errCode; return errCode;
@ -929,30 +932,34 @@ bool SqliteConnection::IsInTransaction()
return inTransaction_; return inTransaction_;
} }
int SqliteConnection::TryCheckPoint()
{
std::string walName = sqlite3_filename_wal(sqlite3_db_filename(dbHandle, "main"));
int fileSize = SqliteUtils::GetFileSize(walName);
if (fileSize <= GlobalExpr::DB_WAL_SIZE_LIMIT_MIN) {
return E_OK;
}
int errCode = sqlite3_wal_checkpoint_v2(dbHandle, nullptr, SQLITE_CHECKPOINT_TRUNCATE, nullptr, nullptr);
if (errCode != SQLITE_OK) {
LOG_WARN("sqlite3_wal_checkpoint_v2 failed err %{public}d.", errCode);
return E_ERROR;
}
return E_OK;
}
int SqliteConnection::LimitWalSize() int SqliteConnection::LimitWalSize()
{ {
if (!isWriteConnection) { if (!isConfigured_ || !isWriteConnection) {
return E_OK; return E_OK;
} }
std::string walName = sqlite3_filename_wal(sqlite3_db_filename(dbHandle, "main")); std::string walName = sqlite3_filename_wal(sqlite3_db_filename(dbHandle, "main"));
if (SqliteUtils::GetFileSize(walName) <= GlobalExpr::DB_WAL_SIZE_LIMIT) {
return E_OK;
}
int errCode = sqlite3_wal_checkpoint_v2(dbHandle, nullptr, SQLITE_CHECKPOINT_TRUNCATE, nullptr, nullptr);
if (errCode != SQLITE_OK) {
LOG_WARN("sqlite3_wal_checkpoint_v2 failed %{public}d.", errCode);
return E_WAL_SIZE_OVER_LIMIT;
}
int fileSize = SqliteUtils::GetFileSize(walName); int fileSize = SqliteUtils::GetFileSize(walName);
if (fileSize > GlobalExpr::DB_WAL_SIZE_LIMIT) { if (fileSize > GlobalExpr::DB_WAL_SIZE_LIMIT_MAX) {
LOG_ERROR("the WAL file size over default limit, %{public}s size is %{public}d", LOG_ERROR("the WAL file size over default limit, %{public}s size is %{public}d",
SqliteUtils::Anonymous(walName).c_str(), fileSize); SqliteUtils::Anonymous(walName).c_str(), fileSize);
return E_WAL_SIZE_OVER_LIMIT; return E_WAL_SIZE_OVER_LIMIT;
} }
return E_OK; return E_OK;
} }
@ -1039,5 +1046,10 @@ void SqliteConnection::MergeAsset(ValueObject::Asset &oldAsset, ValueObject::Ass
return; return;
} }
} }
int SqliteConnection::GetMaxVariableNumber()
{
return maxVariableNumber_;
}
} // namespace NativeRdb } // namespace NativeRdb
} // namespace OHOS } // namespace OHOS

View File

@ -129,6 +129,7 @@ void SqliteConnectionPool::ReleaseConnection(std::shared_ptr<SqliteConnection> c
connection->DesFinalize(); connection->DesFinalize();
if (connection == writeConnection_) { if (connection == writeConnection_) {
ReleaseWriteConnection(); ReleaseWriteConnection();
connection->TryCheckPoint();
} else { } else {
ReleaseReadConnection(connection); ReleaseReadConnection(connection);
} }
@ -226,7 +227,6 @@ int SqliteConnectionPool::InnerReOpenReadConnections()
return errCode; return errCode;
} }
int SqliteConnectionPool::ReOpenAvailableReadConnections() int SqliteConnectionPool::ReOpenAvailableReadConnections()
{ {
std::unique_lock<std::mutex> lock(readMutex_); std::unique_lock<std::mutex> lock(readMutex_);

View File

@ -103,7 +103,6 @@ if (is_ohos && !build_ohos_sdk) {
deps += [ "//third_party/sqlite:sqlite" ] deps += [ "//third_party/sqlite:sqlite" ]
ldflags = [ "-Wl,--exclude-libs,ALL" ] ldflags = [ "-Wl,--exclude-libs,ALL" ]
cflags_cc = [ "-fvisibility=hidden" ] cflags_cc = [ "-fvisibility=hidden" ]
sources += [ sources += [
"${relational_store_native_path}/rdb/src/abs_shared_result_set.cpp", "${relational_store_native_path}/rdb/src/abs_shared_result_set.cpp",
"${relational_store_native_path}/rdb/src/delay_notify.cpp", "${relational_store_native_path}/rdb/src/delay_notify.cpp",

View File

@ -175,7 +175,6 @@ HWTEST_F(RdbTransactionTest, RdbStore_Transaction_002, TestSize.Level1)
EXPECT_EQ(deletedRows, 3); EXPECT_EQ(deletedRows, 3);
} }
/** /**
* @tc.name: RdbStore_Transaction_003 * @tc.name: RdbStore_Transaction_003
* @tc.desc: test RdbStore BaseTransaction * @tc.desc: test RdbStore BaseTransaction
@ -536,7 +535,7 @@ HWTEST_F(RdbTransactionTest, RdbStore_BatchInsert_003, TestSize.Level1)
number = INT_MIN; number = INT_MIN;
error = store->BatchInsert(number, "test", valuesBuckets); error = store->BatchInsert(number, "test", valuesBuckets);
EXPECT_EQ(E_OK, error); EXPECT_EQ(E_OK, error);
EXPECT_EQ(-1, number); EXPECT_EQ(50, number);
resultSet = store->QuerySql("SELECT * FROM test"); resultSet = store->QuerySql("SELECT * FROM test");
resultSet->GetRowCount(rowCount); resultSet->GetRowCount(rowCount);
@ -551,37 +550,4 @@ HWTEST_F(RdbTransactionTest, RdbStore_BatchInsert_003, TestSize.Level1)
} }
resultSet->Close(); resultSet->Close();
EXPECT_EQ(100, number); EXPECT_EQ(100, number);
}
/**
* @tc.name: RdbStore_BatchInsert_004
* @tc.desc: Abnormal testCase of transaction for batchInsert, if batchInsert in transaction
* @tc.type: FUNC
*/
HWTEST_F(RdbTransactionTest, RdbStore_BatchInsert_004, TestSize.Level1)
{
std::shared_ptr<RdbStore> &store = RdbTransactionTest::store;
store->ExecuteSql("delete from test");
int id = 0;
ValuesBucket values;
std::vector<ValuesBucket> valuesBuckets;
for (int i = 0; i < 10; i++) {
values.Clear();
values.PutInt("id", id + i);
values.PutString("name", "zhangsan");
valuesBuckets.push_back(values);
}
int error = store->BeginTransaction();
EXPECT_EQ(E_OK, error);
int64_t number = 0;
error = store->BatchInsert(number, "test", valuesBuckets);
EXPECT_EQ(E_TRANSACTION_IN_EXECUTE, error);
EXPECT_EQ(0, number);
error = store->Commit();
EXPECT_EQ(E_OK, error);
} }

View File

@ -39,7 +39,7 @@ public:
static void MakeWalNoReachLimit(); static void MakeWalNoReachLimit();
static void MakeWalIncrease(); static void MakeWalIncrease();
static void KeepReadConnection(); static void KeepReadConnection();
static ValuesBucket MakeValueBucket(); static ValuesBucket MakeValueBucket(const int &id);
static const std::string DATABASE_NAME; static const std::string DATABASE_NAME;
static std::shared_ptr<RdbStore> store; static std::shared_ptr<RdbStore> store;
@ -52,7 +52,6 @@ std::shared_ptr<ResultSet> RdbWalLimitTest::resultSet = nullptr;
// create 1M data // create 1M data
std::vector<uint8_t> blobValue = RdbWalLimitTest::CreateRandomData(1 * 1024 * 1024); std::vector<uint8_t> blobValue = RdbWalLimitTest::CreateRandomData(1 * 1024 * 1024);
ValuesBucket values = RdbWalLimitTest::MakeValueBucket();
class RdbWalLimitCallback : public RdbOpenCallback { class RdbWalLimitCallback : public RdbOpenCallback {
public: public:
@ -184,10 +183,10 @@ void RdbWalLimitTest::MakeWalNoReachLimit()
} }
} }
ValuesBucket RdbWalLimitTest::MakeValueBucket() ValuesBucket RdbWalLimitTest::MakeValueBucket(const int &id)
{ {
ValuesBucket values; ValuesBucket values;
values.PutInt("id", 200); values.PutInt("id", id);
values.PutString("name", std::string("lisi")); values.PutString("name", std::string("lisi"));
values.PutInt("age", 18); values.PutInt("age", 18);
values.PutDouble("salary", 200.8); values.PutDouble("salary", 200.8);
@ -197,7 +196,8 @@ ValuesBucket RdbWalLimitTest::MakeValueBucket()
/** /**
* @tc.name: RdbStore_WalOverLimit_001 * @tc.name: RdbStore_WalOverLimit_001
* @tc.desc: Without reading data, the WAL size will not over default limit if write data continuously. * @tc.desc: Without reading data or conducting transactions, if data is continuously written,
* the WAL size will not exceed the default limit.
* @tc.type: FUNC * @tc.type: FUNC
* @tc.acquire: AR000HR0G5 * @tc.acquire: AR000HR0G5
*/ */
@ -206,19 +206,14 @@ HWTEST_F(RdbWalLimitTest, RdbStore_WalOverLimit_001, TestSize.Level1)
MakeWalIncrease(); MakeWalIncrease();
int64_t id; int64_t id;
ValuesBucket values = MakeValueBucket(199);
values.PutBlob("blobType", blobValue);
EXPECT_EQ(store->Insert(id, "test", values), E_OK); EXPECT_EQ(store->Insert(id, "test", values), E_OK);
int changedRows;
EXPECT_EQ(store->Update(changedRows, "test", values, "id = ?", std::vector<std::string>{ "200" }), E_OK);
EXPECT_EQ(store->Replace(id, "test", values), E_OK);
int deletedRows;
EXPECT_EQ(store->Delete(deletedRows, "test", "id = 200"), E_OK);
std::vector<ValuesBucket> valuesBuckets; std::vector<ValuesBucket> valuesBuckets;
for (int i = 0; i < 10; i++) { for (int i = 200; i < 210; i++) {
valuesBuckets.push_back(values); valuesBuckets.push_back(RdbWalLimitTest::MakeValueBucket(i));
} }
int64_t insertNum = 0; int64_t insertNum = 0;
@ -229,7 +224,7 @@ HWTEST_F(RdbWalLimitTest, RdbStore_WalOverLimit_001, TestSize.Level1)
/** /**
* @tc.name: RdbStore_WalOverLimit_002 * @tc.name: RdbStore_WalOverLimit_002
* @tc.desc: While reading data and writing data continuously if the WAL size not over default limit. * @tc.desc: Before the wal file exceeds the limit, both read and write can be executed normally.
* @tc.type: FUNC * @tc.type: FUNC
* @tc.acquire: AR000HR0G5 * @tc.acquire: AR000HR0G5
*/ */
@ -237,19 +232,11 @@ HWTEST_F(RdbWalLimitTest, RdbStore_WalOverLimit_002, TestSize.Level1)
{ {
KeepReadConnection(); KeepReadConnection();
MakeWalNoReachLimit(); MakeWalNoReachLimit();
ValuesBucket values = MakeValueBucket(20);
int changedRows;
EXPECT_EQ(store->Update(changedRows, "test", values, "id = ?", std::vector<std::string>{ "21" }), E_OK);
int64_t id;
EXPECT_EQ(store->Replace(id, "test", values), E_OK);
int deletedRows;
EXPECT_EQ(store->Delete(deletedRows, "test", "id = 21"), E_OK);
std::vector<ValuesBucket> valuesBuckets; std::vector<ValuesBucket> valuesBuckets;
for (int i = 0; i < 10; i++) { for (int i = 21; i < 30; i++) {
valuesBuckets.push_back(values); valuesBuckets.push_back(MakeValueBucket(i));
} }
int64_t insertNum = 0; int64_t insertNum = 0;
@ -260,116 +247,16 @@ HWTEST_F(RdbWalLimitTest, RdbStore_WalOverLimit_002, TestSize.Level1)
/** /**
* @tc.name: RdbStore_WalOverLimit_003 * @tc.name: RdbStore_WalOverLimit_003
* @tc.desc: While reading data, can not write data continuously if the WAL size over default limit. * @tc.desc: During transactions, the size of the wal file may exceed the limit.
* @tc.type: FUNC * @tc.type: FUNC
* @tc.acquire: AR000HR0G5 * @tc.acquire: AR000HR0G5
*/ */
HWTEST_F(RdbWalLimitTest, RdbStore_WalOverLimit_003, TestSize.Level3) HWTEST_F(RdbWalLimitTest, RdbStore_WalOverLimit_003, TestSize.Level3)
{ {
KeepReadConnection(); ValuesBucket values = MakeValueBucket(200);
int64_t id;
store->BeginTransaction();
MakeWalReachLimit(); MakeWalReachLimit();
int64_t id;
EXPECT_EQ(store->Insert(id, "test", values), E_WAL_SIZE_OVER_LIMIT); EXPECT_EQ(store->Insert(id, "test", values), E_WAL_SIZE_OVER_LIMIT);
store->Commit();
EXPECT_EQ(store->BeginTransaction(), E_WAL_SIZE_OVER_LIMIT); }
int changedRows;
EXPECT_EQ(store->Update(changedRows, "test", values, "id = ?", std::vector<std::string>{ "200" }),
E_WAL_SIZE_OVER_LIMIT);
EXPECT_EQ(store->Replace(id, "test", values), E_WAL_SIZE_OVER_LIMIT);
int deletedRows;
EXPECT_EQ(store->Delete(deletedRows, "test", "id = 200"), E_WAL_SIZE_OVER_LIMIT);
std::vector<ValuesBucket> valuesBuckets;
for (int i = 0; i < 2; i++) {
valuesBuckets.push_back(values);
}
int64_t insertNum = 0;
EXPECT_EQ(store->BatchInsert(insertNum, "test", valuesBuckets), E_WAL_SIZE_OVER_LIMIT);
EXPECT_EQ(store->ExecuteSql("DELETE FROM test"), E_WAL_SIZE_OVER_LIMIT);
int64_t outLong;
EXPECT_EQ(store->ExecuteAndGetLong(outLong, "DELETE FROM test"), E_WAL_SIZE_OVER_LIMIT);
}
/**
* @tc.name: RdbStore_WalOverLimit_004
* @tc.desc: Writing data after closing read connection.
* @tc.type: FUNC
* @tc.acquire: AR000HR0G5
*/
HWTEST_F(RdbWalLimitTest, RdbStore_WalOverLimit_004, TestSize.Level3)
{
KeepReadConnection();
MakeWalReachLimit();
int64_t id;
EXPECT_EQ(store->Insert(id, "test", values), E_WAL_SIZE_OVER_LIMIT);
EXPECT_EQ(resultSet->Close(), E_OK);
{
store->BeginTransaction();
EXPECT_EQ(store->Insert(id, "test", values), E_OK);
store->Commit();
}
int changedRows;
EXPECT_EQ(store->Update(changedRows, "test", values, "id = ?", std::vector<std::string>{ "200" }), E_OK);
EXPECT_EQ(store->Replace(id, "test", values), E_OK);
int deletedRows;
EXPECT_EQ(store->Delete(deletedRows, "test", "id = 200"), E_OK);
std::vector<ValuesBucket> valuesBuckets;
for (int i = 0; i < 2; i++) {
valuesBuckets.push_back(values);
}
int64_t insertNum = 0;
EXPECT_EQ(store->BatchInsert(insertNum, "test", valuesBuckets), E_OK);
EXPECT_EQ(store->ExecuteSql("DELETE FROM test"), E_OK);
}
/**
* @tc.name: RdbStore_WalOverLimit_005
* @tc.desc: While reading data and transaction will fail if the WAL file size over default size.
* @tc.type: FUNC
* @tc.acquire: AR000HR0G5
*/
HWTEST_F(RdbWalLimitTest, RdbStore_WalOverLimit_005, TestSize.Level3)
{
KeepReadConnection();
int64_t id;
{
store->BeginTransaction();
MakeWalReachLimit();
EXPECT_EQ(store->Insert(id, "test", values), E_WAL_SIZE_OVER_LIMIT);
store->Commit();
}
int changedRows;
EXPECT_EQ(store->Update(changedRows, "test", values, "id = ?", std::vector<std::string>{ "200" }),
E_WAL_SIZE_OVER_LIMIT);
EXPECT_EQ(store->Replace(id, "test", values), E_WAL_SIZE_OVER_LIMIT);
int deletedRows;
EXPECT_EQ(store->Delete(deletedRows, "test", "id = 200"),
E_WAL_SIZE_OVER_LIMIT);
std::vector<ValuesBucket> valuesBuckets;
for (int i = 0; i < 2; i++) {
valuesBuckets.push_back(values);
}
int64_t insertNum = 0;
EXPECT_EQ(store->BatchInsert(insertNum, "test", valuesBuckets), E_WAL_SIZE_OVER_LIMIT);
EXPECT_EQ(store->ExecuteSql("DELETE FROM test"), E_WAL_SIZE_OVER_LIMIT);
}