mirror of
https://gitee.com/openharmony/distributeddatamgr_relational_store
synced 2024-11-27 01:01:02 +00:00
commit
ce25efd607
@ -66,19 +66,17 @@ public:
|
||||
|
||||
private:
|
||||
struct ConnNode {
|
||||
static constexpr uint32_t CHECK_POINT_INTERVAL = 5; // 5 min
|
||||
bool using_ = false;
|
||||
int32_t tid_ = 0;
|
||||
int32_t id_ = 0;
|
||||
std::chrono::steady_clock::time_point time_ = std::chrono::steady_clock::now();
|
||||
std::chrono::steady_clock::time_point failedTime_;
|
||||
std::shared_ptr<Connection> connect_;
|
||||
|
||||
explicit ConnNode(std::shared_ptr<Connection> conn);
|
||||
std::shared_ptr<Connection> GetConnect();
|
||||
int64_t GetUsingTime() const;
|
||||
bool IsWriter() const;
|
||||
int32_t Unused(int32_t count);
|
||||
int32_t Unused(int32_t count, bool timeout);
|
||||
};
|
||||
|
||||
struct Container {
|
||||
@ -127,6 +125,7 @@ private:
|
||||
int RestoreMasterDb(const std::string &newPath, const std::string &backupPath);
|
||||
bool CheckIntegrity(const std::string &dbPath);
|
||||
|
||||
static constexpr uint32_t CHECK_POINT_INTERVAL = 5; // 5 min
|
||||
static constexpr int LIMITATION = 1024;
|
||||
static constexpr uint32_t ITER_V1 = 5000;
|
||||
static constexpr uint32_t ITERS_COUNT = 2;
|
||||
@ -144,6 +143,7 @@ private:
|
||||
bool transactionUsed_;
|
||||
std::atomic<bool> isInTransaction_ = false;
|
||||
std::atomic<uint32_t> transCount_ = 0;
|
||||
std::atomic<std::chrono::steady_clock::time_point> failedTime_;
|
||||
};
|
||||
|
||||
} // namespace NativeRdb
|
||||
|
@ -138,6 +138,7 @@ private:
|
||||
};
|
||||
static constexpr const char *MERGE_ASSETS_FUNC = "merge_assets";
|
||||
static constexpr const char *MERGE_ASSET_FUNC = "merge_asset";
|
||||
static constexpr int CHECKPOINT_TIME = 1000;
|
||||
static constexpr int DEFAULT_BUSY_TIMEOUT_MS = 2000;
|
||||
static constexpr int BACKUP_PAGES_PRE_STEP = 12800; // 1024 * 4 * 12800 == 50m
|
||||
static constexpr int BACKUP_PRE_WAIT_TIME = 10;
|
||||
|
@ -16,7 +16,6 @@
|
||||
#include "connection_pool.h"
|
||||
|
||||
#include <base_transaction.h>
|
||||
|
||||
#include <condition_variable>
|
||||
#include <iterator>
|
||||
#include <mutex>
|
||||
@ -181,10 +180,10 @@ std::shared_ptr<Connection> ConnPool::Convert2AutoConn(std::shared_ptr<ConnNode>
|
||||
if (realPool == nullptr) {
|
||||
return;
|
||||
}
|
||||
realPool->ReleaseNode(node, !isTrans);
|
||||
if (isTrans) {
|
||||
realPool->transCount_--;
|
||||
}
|
||||
realPool->ReleaseNode(node, !isTrans);
|
||||
node = nullptr;
|
||||
});
|
||||
}
|
||||
@ -295,16 +294,24 @@ SharedConn ConnPool::AcquireRef(bool isReadOnly, std::chrono::milliseconds ms)
|
||||
});
|
||||
}
|
||||
|
||||
void ConnPool::ReleaseNode(std::shared_ptr<ConnNode> node, bool reuse)
|
||||
void ConnPool::ReleaseNode(std::shared_ptr<ConnNode> node, bool reuse)
|
||||
{
|
||||
if (node == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto now = steady_clock::now();
|
||||
auto timeout = now > (failedTime_.load() + minutes(CHECK_POINT_INTERVAL)) || now < failedTime_.load();
|
||||
auto transCount = transCount_ + isInTransaction_;
|
||||
auto errCode = node->Unused(transCount);
|
||||
auto remainCount = reuse ? transCount : transCount - 1;
|
||||
auto errCode = node->Unused(remainCount, timeout);
|
||||
if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
|
||||
writers_.Dump("WAL writers_", transCount);
|
||||
readers_.Dump("WAL readers_", transCount);
|
||||
}
|
||||
|
||||
if (node->IsWriter() && (errCode != E_INNER_WARNING && errCode != E_NOT_SUPPORT)) {
|
||||
failedTime_ = errCode != E_OK ? now : steady_clock::time_point();
|
||||
}
|
||||
|
||||
auto &container = node->IsWriter() ? writers_ : readers_;
|
||||
@ -491,28 +498,22 @@ int64_t ConnPool::ConnNode::GetUsingTime() const
|
||||
return duration_cast<milliseconds>(time).count();
|
||||
}
|
||||
|
||||
int32_t ConnPool::ConnNode::Unused(int32_t count)
|
||||
int32_t ConnPool::ConnNode::Unused(int32_t count, bool timeout)
|
||||
{
|
||||
time_ = steady_clock::now();
|
||||
if (connect_ == nullptr) {
|
||||
return E_OK;
|
||||
}
|
||||
|
||||
connect_->ClearCache();
|
||||
int32_t errCode = E_INNER_WARNING;
|
||||
if (count <= 0) {
|
||||
errCode = connect_->TryCheckPoint(timeout);
|
||||
}
|
||||
|
||||
time_ = steady_clock::now();
|
||||
if (!connect_->IsWriter()) {
|
||||
tid_ = 0;
|
||||
}
|
||||
|
||||
if (count > 0) {
|
||||
return E_OK;
|
||||
}
|
||||
auto timeout = time_ > (failedTime_ + minutes(CHECK_POINT_INTERVAL)) || time_ < failedTime_;
|
||||
int32_t errCode = connect_->TryCheckPoint(timeout);
|
||||
if (errCode == E_INNER_WARNING || errCode == E_NOT_SUPPORT) {
|
||||
return E_OK;
|
||||
}
|
||||
|
||||
failedTime_ = errCode != E_OK ? time_ : steady_clock::time_point();
|
||||
return errCode;
|
||||
}
|
||||
|
||||
|
@ -258,7 +258,6 @@ int SqliteConnection::InnerOpen(const RdbStoreConfig &config)
|
||||
}
|
||||
|
||||
if (isWriter_) {
|
||||
TryCheckPoint(true);
|
||||
ValueObject checkResult{"ok"};
|
||||
auto index = static_cast<uint32_t>(config.GetIntegrityCheck());
|
||||
if (index < static_cast<uint32_t>(sizeof(INTEGRITIES) / sizeof(INTEGRITIES[0]))) {
|
||||
@ -991,7 +990,9 @@ int SqliteConnection::TryCheckPoint(bool timeout)
|
||||
return E_INNER_WARNING;
|
||||
}
|
||||
|
||||
(void)sqlite3_busy_timeout(dbHandle_, CHECKPOINT_TIME);
|
||||
int errCode = sqlite3_wal_checkpoint_v2(dbHandle_, nullptr, SQLITE_CHECKPOINT_TRUNCATE, nullptr, nullptr);
|
||||
(void)sqlite3_busy_timeout(dbHandle_, DEFAULT_BUSY_TIMEOUT_MS);
|
||||
if (errCode != SQLITE_OK) {
|
||||
LOG_WARN("sqlite3_wal_checkpoint_v2 failed err:%{public}d,size:%{public}zd,wal:%{public}s.", errCode, size,
|
||||
SqliteUtils::Anonymous(walName).c_str());
|
||||
@ -1469,6 +1470,7 @@ int32_t SqliteConnection::Repair(const RdbStoreConfig &config)
|
||||
LOG_ERROR("reopen db failed, err:%{public}d", ret);
|
||||
return ret;
|
||||
}
|
||||
connection->TryCheckPoint(true);
|
||||
SlaveStatus curStatus;
|
||||
ret = connection->ExchangeSlaverToMaster(true, false, curStatus);
|
||||
if (ret != E_OK) {
|
||||
|
@ -337,15 +337,17 @@ describe('rdbStoreTest', function () {
|
||||
blobType: new Uint8Array(Array(1024 * 1024).fill(1)),
|
||||
})
|
||||
const middleTime = new Date().getTime();
|
||||
console.log(TAG + "testRdbStore0012, startTime:" + startTime + " middleTime:" + middleTime);
|
||||
|
||||
expect((middleTime - startTime) > 2000).assertTrue();
|
||||
expect((middleTime - startTime) > 1000).assertTrue();
|
||||
|
||||
rdbStore.insertSync('test', {
|
||||
blobType: new Uint8Array(Array(1024 * 1024).fill(1)),
|
||||
})
|
||||
const endTime = new Date().getTime();
|
||||
console.log(TAG + "testRdbStore0012, endTime:" + endTime + " middleTime:" + middleTime);
|
||||
|
||||
expect((endTime - middleTime) < 2000).assertTrue();
|
||||
expect((endTime - middleTime) < 1000).assertTrue();
|
||||
console.log(TAG + "************* testRdbStore0012 end *************");
|
||||
done();
|
||||
} catch (e) {
|
||||
|
Loading…
Reference in New Issue
Block a user