!1725 Restore后重新注册数据变化通知

Merge pull request !1725 from lott14/OpenHarmony-5.0-Release
This commit is contained in:
openharmony_ci 2024-09-23 09:13:16 +00:00 committed by Gitee
commit 2d623e7f7c
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
10 changed files with 65 additions and 13 deletions

View File

@ -26,13 +26,13 @@
namespace OHOS::NativeRdb {
class DelayNotify {
public:
using Task = std::function<int(const DistributedRdb::RdbChangedData &, uint32_t)>;
using Task = std::function<int(const DistributedRdb::RdbChangedData &, const DistributedRdb::RdbNotifyConfig &)>;
using Time = std::chrono::steady_clock::time_point;
DelayNotify();
~DelayNotify();
void SetExecutorPool(std::shared_ptr<ExecutorPool> pool);
void SetTask(Task task);
void UpdateNotify(const DistributedRdb::RdbChangedData &changedData);
void UpdateNotify(const DistributedRdb::RdbChangedData &changedData, bool isFull = false);
void SetAutoSyncInterval(uint32_t autoSyncInterval);
void Pause();
void Resume();
@ -41,6 +41,7 @@ private:
static constexpr uint32_t MAX_NOTIFY_INTERVAL = 5000;
static constexpr uint32_t SERVICE_INTERVAL = 10000;
bool isInitialized_ = false;
bool isFull_ = false;
Time lastTimePoint_;
std::atomic_int32_t pauseCount_;
ExecutorPool::TaskId delaySyncTaskId_ = ExecutorPool::INVALID_TASK_ID;

View File

@ -72,7 +72,7 @@ public:
int32_t Delete(const RdbSyncerParam &param) override;
int32_t NotifyDataChange(const RdbSyncerParam& param, const RdbChangedData &clientChangedData,
uint32_t delay = 0) override;
const RdbNotifyConfig &rdbNotifyConfig) override;
int32_t SetSearchable(const RdbSyncerParam& param, bool isSearchable) override;

View File

@ -266,6 +266,7 @@ private:
std::pair<int32_t, ValueObject> ExecuteEntry(const std::string& sql, const std::vector<ValueObject>& bindArgs,
int64_t trxId);
int GetDestPath(const std::string &backupPath, std::string &destPath);
void NotifyDataChange();
void ReportDbRestoreSuccessEvent();
static constexpr char SCHEME_RDB[] = "rdb://";

View File

@ -26,6 +26,7 @@
namespace OHOS::ITypesUtil {
using SubOption = DistributedRdb::SubscribeOption;
using SyncerParam = DistributedRdb::RdbSyncerParam;
using NotifyConfig = DistributedRdb::RdbNotifyConfig;
using Option = DistributedRdb::RdbService::Option;
using RdbPredicates = DistributedRdb::PredicatesMemo;
using RdbOperation = DistributedRdb::RdbPredicateOperation;
@ -49,6 +50,10 @@ API_EXPORT bool Marshalling(const SyncerParam &input, MessageParcel &data);
template<>
API_EXPORT bool Unmarshalling(SyncerParam &output, MessageParcel &data);
template<>
API_EXPORT bool Marshalling(const NotifyConfig &input, MessageParcel &data);
template<>
API_EXPORT bool Unmarshalling(NotifyConfig &output, MessageParcel &data);
template<>
API_EXPORT bool Marshalling(const Option &input, MessageParcel &data);
template<>
API_EXPORT bool Unmarshalling(Option &output, MessageParcel &data);

View File

@ -30,14 +30,17 @@ DelayNotify::~DelayNotify()
pool_->Remove(delaySyncTaskId_);
}
if (task_ != nullptr && changedData_.tableData.size() > 0) {
auto errCode = task_(changedData_, 0);
DistributedRdb::RdbNotifyConfig rdbNotifyConfig;
rdbNotifyConfig.delay_ = 0;
rdbNotifyConfig.isFull_ = isFull_;
auto errCode = task_(changedData_, rdbNotifyConfig);
if (errCode != 0) {
LOG_ERROR("NotifyDataChange is failed, err is %{public}d.", errCode);
}
}
}
void DelayNotify::UpdateNotify(const DistributedRdb::RdbChangedData &changedData)
void DelayNotify::UpdateNotify(const DistributedRdb::RdbChangedData &changedData, bool isFull)
{
LOG_DEBUG("Update changed data.");
{
@ -51,6 +54,7 @@ void DelayNotify::UpdateNotify(const DistributedRdb::RdbChangedData &changedData
changedData_.tableData.insert_or_assign(k, v);
}
}
isFull_ |= isFull;
}
StartTimer();
}
@ -72,9 +76,11 @@ void DelayNotify::StartTimer()
{
DistributedRdb::RdbChangedData changedData;
bool needExecTask = false;
bool isFull = false;
{
std::lock_guard<std::mutex> lock(mutex_);
changedData.tableData = changedData_.tableData;
isFull = isFull_;
if (pool_ == nullptr) {
return;
}
@ -106,7 +112,10 @@ void DelayNotify::StartTimer()
}
if (needExecTask) {
task_(changedData, SERVICE_INTERVAL);
DistributedRdb::RdbNotifyConfig rdbNotifyConfig;
rdbNotifyConfig.delay_ = SERVICE_INTERVAL;
rdbNotifyConfig.isFull_ = isFull;
task_(changedData, rdbNotifyConfig);
}
}
@ -122,14 +131,20 @@ void DelayNotify::ExecuteTask()
{
LOG_DEBUG("Notify data change.");
DistributedRdb::RdbChangedData changedData;
bool isFull = false;
{
std::lock_guard<std::mutex> lock(mutex_);
changedData.tableData = std::move(changedData_.tableData);
isFull = isFull_;
RestoreDefaultSyncInterval();
StopTimer();
isFull_ = false;
}
if (task_ != nullptr && changedData.tableData.size() > 0) {
int errCode = task_(changedData, 0);
if (task_ != nullptr && (changedData.tableData.size() > 0 || isFull)) {
DistributedRdb::RdbNotifyConfig rdbNotifyConfig;
rdbNotifyConfig.delay_ = 0;
rdbNotifyConfig.isFull_ = isFull;
int errCode = task_(changedData, rdbNotifyConfig);
if (errCode != 0) {
LOG_ERROR("NotifyDataChange is failed, err is %{public}d.", errCode);
std::lock_guard<std::mutex> lock(mutex_);

View File

@ -512,11 +512,11 @@ int32_t RdbServiceProxy::SetSearchable(const RdbSyncerParam& param, bool isSearc
}
int32_t RdbServiceProxy::NotifyDataChange(const RdbSyncerParam &param, const RdbChangedData &rdbChangedData,
uint32_t delay)
const RdbNotifyConfig &rdbNotifyConfig)
{
MessageParcel reply;
int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_NOTIFY_DATA_CHANGE),
reply, param, rdbChangedData, delay);
reply, param, rdbChangedData, rdbNotifyConfig);
if (status != RDB_OK) {
LOG_ERROR("RdbServiceProxy NotifyDataChange fail, status:%{public}d, "
"bundleName:%{public}s, storeName:%{public}s",

View File

@ -68,6 +68,7 @@ namespace OHOS::NativeRdb {
using namespace OHOS::Rdb;
using namespace std::chrono;
using SqlStatistic = DistributedRdb::SqlStatistic;
using RdbNotifyConfig = DistributedRdb::RdbNotifyConfig;
#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
using RdbMgr = DistributedRdb::RdbManagerImpl;
#endif
@ -808,6 +809,18 @@ std::shared_ptr<AbsSharedResultSet> RdbStoreImpl::QuerySql(const std::string &sq
}
return std::make_shared<SqliteSharedResultSet>(connectionPool_, path_, sql, bindArgs);
}
void RdbStoreImpl::NotifyDataChange()
{
int errCode = RegisterDataChangeCallback();
if (errCode != E_OK) {
LOG_ERROR("RegisterDataChangeCallback is failed, err is %{public}d.", errCode);
}
DistributedRdb::RdbChangedData rdbChangedData;
if (delayNotifier_ != nullptr) {
delayNotifier_->UpdateNotify(rdbChangedData, true);
}
}
#endif
#if defined(WINDOWS_PLATFORM) || defined(MAC_PLATFORM) || defined(ANDROID_PLATFORM) || defined(IOS_PLATFORM)
@ -1776,6 +1789,7 @@ int RdbStoreImpl::Restore(const std::string &backupPath, const std::vector<uint8
#endif
int errCode = connectionPool_->ChangeDbFileForRestore(path_, destPath, newKey, slaveStatus_);
#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
NotifyDataChange();
SecurityPolicy::SetSecurityLabel(config_);
if (service != nullptr) {
service->Enable(syncerParam_);
@ -2262,7 +2276,7 @@ void RdbStoreImpl::InitDelayNotifier()
}
delayNotifier_->SetExecutorPool(pool_);
delayNotifier_->SetTask([param = syncerParam_]
(const DistributedRdb::RdbChangedData& rdbChangedData, uint32_t delay) -> int {
(const DistributedRdb::RdbChangedData& rdbChangedData, const RdbNotifyConfig& rdbNotifyConfig) -> int {
auto [errCode, service] = DistributedRdb::RdbManagerImpl::GetInstance().GetRdbService(param);
if (errCode == E_NOT_SUPPORT) {
return errCode;
@ -2271,7 +2285,7 @@ void RdbStoreImpl::InitDelayNotifier()
LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
return errCode;
}
return service->NotifyDataChange(param, rdbChangedData, delay);
return service->NotifyDataChange(param, rdbChangedData, rdbNotifyConfig);
});
}

View File

@ -30,6 +30,17 @@ bool Unmarshalling(SyncerParam &output, MessageParcel &data)
output.isSearchable_, output.haMode_);
}
template<>
bool Marshalling(const NotifyConfig &input, MessageParcel &data)
{
return ITypesUtil::Marshal(data, input.delay_, input.isFull_);
}
template<>
bool Unmarshalling(NotifyConfig &output, MessageParcel &data)
{
return ITypesUtil::Unmarshal(data, output.delay_, output.isFull_);
}
template<>
bool Marshalling(const Option &input, MessageParcel &data)
{

View File

@ -74,7 +74,7 @@ public:
const RdbSyncerParam &param, const PredicatesMemo &predicates, const std::vector<std::string> &columns) = 0;
virtual int32_t NotifyDataChange(
const RdbSyncerParam &param, const RdbChangedData &rdbChangedData, uint32_t delay = 0) = 0;
const RdbSyncerParam &param, const RdbChangedData &rdbChangedData, const RdbNotifyConfig &rdbNotifyConfig) = 0;
virtual int32_t SetSearchable(const RdbSyncerParam& param, bool isSearchable) = 0;

View File

@ -56,6 +56,11 @@ struct RdbSyncerParam {
};
};
struct RdbNotifyConfig {
uint32_t delay_ = 0;
bool isFull_ = false;
};
enum SyncMode {
PUSH,
PULL,