objectstore restore and sync

Signed-off-by: hanlu <hanlu1@huawei.com>
This commit is contained in:
hanlu
2022-02-08 19:37:52 +08:00
parent 1d11caf6ee
commit 5a1d761cfd
22 changed files with 432 additions and 250 deletions
+177
View File
@@ -0,0 +1,177 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
@@ -24,18 +24,24 @@
namespace OHOS::ObjectStore {
class WatcherProxy;
enum SyncStatus {
START,
FINISHED
};
class DistributedObjectStoreImpl : public DistributedObjectStore {
public:
DistributedObjectStoreImpl(FlatObjectStore *flatObjectStore);
~DistributedObjectStoreImpl() override;
uint32_t Get(const std::string &sessionId, DistributedObject *object) override;
DistributedObject *CreateObject(const std::string &sessionId) override;
uint32_t Sync(DistributedObject *object) override;
uint32_t DeleteObject(const std::string &sessionId) override;
uint32_t Watch(DistributedObject *object, std::shared_ptr<ObjectWatcher> watcher) override;
uint32_t UnWatch(DistributedObject *object) override;
void TriggerSync() override;
void TriggerRestore(std::function<void()> notifier) override;
private:
void UpdateStatus(SyncStatus status);
DistributedObjectImpl *CacheObject(const std::string &sessionId, FlatObjectStore *flatObjectStore);
FlatObjectStore *flatObjectStore_ = nullptr;
std::map<DistributedObject *, std::shared_ptr<WatcherProxy>> watchers_;
@@ -47,7 +53,6 @@ class WatcherProxy : public FlatObjectWatcher {
public:
WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher, const std::string &sessionId);
void OnChanged(const std::string &sessionid, const std::vector<std::string> &changedData) override;
void OnDeleted(const std::string &sessionid) override;
private:
std::shared_ptr<ObjectWatcher> objectWatcher_;
@@ -34,12 +34,10 @@ public:
uint32_t DeleteTable(const std::string &key) override;
uint32_t CreateTable(const std::string &key) override;
uint32_t GetTable(const std::string &key, std::map<std::string, Value> &result) override;
uint32_t UpdateItems(const std::string &key, std::map<std::string, Value> &data) override;
uint32_t UpdateItem(const std::string &key, const std::string &itemKey, Value &value) override;
uint32_t GetItem(const std::string &key, const std::string &itemKey, Value &value) override;
uint32_t RegisterObserver(const std::string &key, std::shared_ptr<TableWatcher> watcher) override;
uint32_t UnRegisterObserver(const std::string &key) override;
uint32_t ChangeKey(const std::string &oldKey, const std::string &newKey) override;
bool isOpened_ = false;
private:
@@ -29,7 +29,6 @@ public:
{
}
void OnChanged(const std::string &sessionid, const std::vector<std::string> &changedData) override;
void OnDeleted(const std::string &sessionid) override;
};
class FlatObjectStore {
@@ -34,7 +34,6 @@ public:
{
}
void OnChanged(const std::string &sessionid, const std::vector<std::string> &changedData) override;
void OnDeleted(const std::string &sessionid) override;
};
class ObjectStorageEngine {
@@ -50,12 +49,10 @@ public:
virtual uint32_t DeleteTable(const std::string &key) = 0;
virtual uint32_t CreateTable(const std::string &key) = 0;
virtual uint32_t GetTable(const std::string &key, std::map<std::string, Value> &result) = 0;
virtual uint32_t UpdateItems(const std::string &key, std::map<std::string, Value> &data) = 0;
virtual uint32_t UpdateItem(const std::string &key, const std::string &itemKey, Value &value) = 0;
virtual uint32_t GetItem(const std::string &key, const std::string &itemKey, Value &value) = 0;
virtual uint32_t RegisterObserver(const std::string &key, std::shared_ptr<TableWatcher> watcher) = 0;
virtual uint32_t UnRegisterObserver(const std::string &key) = 0;
virtual uint32_t ChangeKey(const std::string &oldKey, const std::string &newKey) = 0;
};
} // namespace OHOS::ObjectStore
#endif
@@ -27,7 +27,6 @@ public:
Watcher(const std::string &sessionId);
virtual ~Watcher() = default;
virtual void OnChanged(const std::string &sessionid, const std::vector<std::string> &changedData) = 0;
virtual void OnDeleted(const std::string &sessionid) = 0;
void OnChange(const DistributedDB::KvStoreChangedData &data) override;
@@ -56,7 +56,7 @@ uint32_t DistributedObjectImpl::PutDouble(const std::string &key, double value)
Type type = Type::TYPE_DOUBLE;
PutNum(&type, 0, sizeof(type), data);
PutNum(&value, sizeof(type), sizeof(value), data);
uint32_t status = flatObjectStore_->Put(sessionId_, key, data);
uint32_t status = flatObjectStore_->Put(sessionId_, FIELDS_PREFIX + key, data);
if (status != SUCCESS) {
LOG_ERROR("DistributedObjectImpl::PutDouble setField err %{public}d", status);
}
@@ -69,7 +69,7 @@ uint32_t DistributedObjectImpl::PutBoolean(const std::string &key, bool value)
Type type = Type::TYPE_BOOLEAN;
PutNum(&type, 0, sizeof(type), data);
PutNum(&value, sizeof(type), sizeof(value), data);
uint32_t status = flatObjectStore_->Put(sessionId_, key, data);
uint32_t status = flatObjectStore_->Put(sessionId_, FIELDS_PREFIX + key, data);
if (status != SUCCESS) {
LOG_ERROR("DistributedObjectImpl::PutBoolean setField err %{public}d", status);
}
@@ -83,7 +83,7 @@ uint32_t DistributedObjectImpl::PutString(const std::string &key, const std::str
PutNum(&type, 0, sizeof(type), data);
Bytes dst = StringUtils::StrToBytes(value);
data.insert(data.end(), dst.begin(), dst.end());
uint32_t status = flatObjectStore_->Put(sessionId_, key, data);
uint32_t status = flatObjectStore_->Put(sessionId_, FIELDS_PREFIX + key, data);
if (status != SUCCESS) {
LOG_ERROR("DistributedObjectImpl::PutString setField err %{public}d", status);
}
@@ -94,7 +94,7 @@ uint32_t DistributedObjectImpl::GetDouble(const std::string &key, double &value)
{
Bytes data;
Bytes keyBytes = StringUtils::StrToBytes(key);
uint32_t status = flatObjectStore_->Get(sessionId_, key, data);
uint32_t status = flatObjectStore_->Get(sessionId_, FIELDS_PREFIX + key, data);
if (status != SUCCESS) {
LOG_ERROR("DistributedObjectImpl:GetDouble field not exist. %{public}d %{public}s", status, key.c_str());
return status;
@@ -110,7 +110,7 @@ uint32_t DistributedObjectImpl::GetBoolean(const std::string &key, bool &value)
{
Bytes data;
Bytes keyBytes = StringUtils::StrToBytes(key);
uint32_t status = flatObjectStore_->Get(sessionId_, key, data);
uint32_t status = flatObjectStore_->Get(sessionId_, FIELDS_PREFIX + key, data);
if (status != SUCCESS) {
LOG_ERROR("DistributedObjectImpl:GetBoolean field not exist. %{public}d %{public}s", status, key.c_str());
return status;
@@ -126,7 +126,7 @@ uint32_t DistributedObjectImpl::GetBoolean(const std::string &key, bool &value)
uint32_t DistributedObjectImpl::GetString(const std::string &key, std::string &value)
{
Bytes data;
uint32_t status = flatObjectStore_->Get(sessionId_, key, data);
uint32_t status = flatObjectStore_->Get(sessionId_, FIELDS_PREFIX + key, data);
if (status != SUCCESS) {
LOG_ERROR("DistributedObjectImpl:GetString field not exist. %{public}d %{public}s", status, key.c_str());
return status;
@@ -141,7 +141,7 @@ uint32_t DistributedObjectImpl::GetString(const std::string &key, std::string &v
uint32_t DistributedObjectImpl::GetType(const std::string &key, Type &type)
{
Bytes data;
uint32_t status = flatObjectStore_->Get(sessionId_, key, data);
uint32_t status = flatObjectStore_->Get(sessionId_, FIELDS_PREFIX + key, data);
if (status != SUCCESS) {
LOG_ERROR("DistributedObjectImpl:GetString field not exist. %{public}d %{public}s", status, key.c_str());
return status;
@@ -13,12 +13,16 @@
* limitations under the License.
*/
#include <thread>
#include "distributed_object_impl.h"
#include "distributed_objectstore_impl.h"
#include "objectstore_errors.h"
#include "string_utils.h"
namespace OHOS::ObjectStore {
#define TO_STRING(input) #input
static constexpr char PROPERTY_STATUS_NAME[] = "status";
DistributedObjectStoreImpl::DistributedObjectStoreImpl(FlatObjectStore *flatObjectStore)
: flatObjectStore_(flatObjectStore)
{
@@ -55,19 +59,6 @@ DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &s
return CacheObject(sessionId, flatObjectStore_);
}
uint32_t DistributedObjectStoreImpl::Sync(DistributedObject *object) // todo may delete
{
if (object == nullptr) {
LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
return ERR_NULL_OBJECT;
}
if (flatObjectStore_ == nullptr) {
LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
return ERR_NULL_OBJECTSTORE;
}
return SUCCESS;
}
uint32_t DistributedObjectStoreImpl::DeleteObject(const std::string &sessionId)
{
if (flatObjectStore_ == nullptr) {
@@ -141,6 +132,57 @@ uint32_t DistributedObjectStoreImpl::UnWatch(DistributedObject *object)
return SUCCESS;
}
void DistributedObjectStoreImpl::TriggerSync()
{
UpdateStatus(START);
}
void DistributedObjectStoreImpl::TriggerRestore(std::function<void()> notifier)
{
std::thread th = std::thread([&]() {
constexpr uint32_t RETRY_TIMES = 50;
uint32_t i = 0;
uint32_t status = ERR_DB_NOT_INIT;
while (i++ < RETRY_TIMES) {
bool isFinished = true;
std::string syncStatus;
{
std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
for (auto item : objects_) {
status = item->GetString(PROPERTY_STATUS_NAME, syncStatus);
if (status != SUCCESS || syncStatus != TO_STRING(START)) {
LOG_WARN("%{public}s not ready", item->GetSessionId().c_str());
isFinished = false;
break;
}
}
}
if (isFinished) {
status = SUCCESS;
break;
}
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
LOG_WARN("restore result %{public}d", status);
notifier();
UpdateStatus(FINISHED);
});
th.detach();
return;
}
void DistributedObjectStoreImpl::UpdateStatus(SyncStatus status)
{
std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
std::string statusUpdated = TO_STRING(status);
LOG_INFO("update status to %{public}s", statusUpdated.c_str());
for (auto item : objects_) {
item->PutString(PROPERTY_STATUS_NAME, statusUpdated);
}
return;
}
WatcherProxy::WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher, const std::string &sessionId)
: FlatObjectWatcher(sessionId), objectWatcher_(objectWatcher)
{
@@ -151,11 +193,6 @@ void WatcherProxy::OnChanged(const std::string &sessionid, const std::vector<std
objectWatcher_->OnChanged(sessionid, changedData);
}
void WatcherProxy::OnDeleted(const std::string &sessionid)
{
objectWatcher_->OnDeleted(sessionid);
}
DistributedObjectStore *DistributedObjectStore::GetInstance(const std::string &bundleName)
{
static char instMemory[sizeof(DistributedObjectStoreImpl)];
@@ -40,19 +40,19 @@ uint32_t FlatObjectStorageEngine::Open(const std::string &bundleName)
auto status = DistributedDB::KvStoreDelegateManager::SetProcessLabel("objectstoreDB", bundleName);
if (status != DistributedDB::DBStatus::OK) {
LOG_ERROR("delegate SetProcessLabel failed: %{public}d.", static_cast<int>(status));
return SUCCESS;
return ERR_DB_SET_PROCESS;
}
auto communicator = std::make_shared<ProcessCommunicatorImpl>();
auto commStatus = DistributedDB::KvStoreDelegateManager::SetProcessCommunicator(communicator);
if (commStatus != DistributedDB::DBStatus::OK) {
LOG_ERROR("set distributed db communicator failed.");
return SUCCESS;
return ERR_DB_SET_PROCESS;
}
storeManager_ = std::make_shared<DistributedDB::KvStoreDelegateManager>(bundleName, "default");
if (storeManager_ == nullptr) {
LOG_ERROR("FlatObjectStorageEngine::make shared fail");
return ERR_MOMEM;
return ERR_NOMEM;
}
isOpened_ = true;
LOG_INFO("FlatObjectDatabase::Open Succeed");
@@ -104,7 +104,7 @@ uint32_t FlatObjectStorageEngine::CreateTable(const std::string &key)
status = kvStore->Pragma(DistributedDB::AUTO_SYNC, data);
if (status != DistributedDB::DBStatus::OK) {
LOG_ERROR("FlatObjectStorageEngine::CreateTable %{public}s getkvstore fail[%{public}d]", key.c_str(), status);
return ERR_DE_GETKV_FAIL;
return ERR_DB_GETKV_FAIL;
}
LOG_INFO("create table %{public}s success", key.c_str());
{
@@ -123,7 +123,7 @@ uint32_t FlatObjectStorageEngine::GetTable(const std::string &key, std::map<std:
std::unique_lock<std::shared_mutex> lock(operationMutex_);
if (delegates_.count(key) == 0) {
LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s not exist", key.c_str());
return ERR_DE_NOT_EXIST;
return ERR_DB_NOT_EXIST;
}
result.clear();
DistributedDB::KvStoreResultSet *resultSet = nullptr;
@@ -148,36 +148,6 @@ uint32_t FlatObjectStorageEngine::GetTable(const std::string &key, std::map<std:
return SUCCESS;
}
uint32_t FlatObjectStorageEngine::UpdateItems(const std::string &key, std::map<std::string, Value> &data)
{
if (!isOpened_) {
return ERR_DB_NOT_INIT;
}
std::unique_lock<std::shared_mutex> lock(operationMutex_);
if (delegates_.count(key) == 0) {
LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s not exist", key.c_str());
return ERR_DE_NOT_EXIST;
}
auto iter = data.begin();
std::vector<DistributedDB::Entry> items;
auto delegate = delegates_.at(key);
while (iter != data.end()) {
DistributedDB::Entry entry;
entry.key = StringUtils::StrToBytes(iter->first);
entry.value = iter->second;
items.insert(items.end(), entry);
iter++;
}
LOG_INFO("start PutBatch");
auto status = delegate->PutBatch(items);
if (status != DistributedDB::DBStatus::OK) {
LOG_ERROR("%{public}s PutBatch fail[%{public}d]", key.c_str(), status);
return ERR_CLOSE_STORAGE;
}
LOG_INFO("end PutBatch");
return SUCCESS;
}
uint32_t FlatObjectStorageEngine::UpdateItem(const std::string &key, const std::string &itemKey, Value &value)
{
if (!isOpened_) {
@@ -186,7 +156,7 @@ uint32_t FlatObjectStorageEngine::UpdateItem(const std::string &key, const std::
std::unique_lock<std::shared_mutex> lock(operationMutex_);
if (delegates_.count(key) == 0) {
LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s not exist", key.c_str());
return ERR_DE_NOT_EXIST;
return ERR_DB_NOT_EXIST;
}
auto delegate = delegates_.at(key);
LOG_INFO("start Put");
@@ -195,7 +165,7 @@ uint32_t FlatObjectStorageEngine::UpdateItem(const std::string &key, const std::
LOG_ERROR("%{public}s PutBatch fail[%{public}d]", key.c_str(), status);
return ERR_CLOSE_STORAGE;
}
LOG_INFO("end Put");
LOG_INFO("put success");
return SUCCESS;
}
@@ -207,7 +177,7 @@ uint32_t FlatObjectStorageEngine::DeleteTable(const std::string &key)
std::unique_lock<std::shared_mutex> lock(operationMutex_);
if (delegates_.count(key) == 0) {
LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s not exist", key.c_str());
return ERR_DE_NOT_EXIST;
return ERR_DB_NOT_EXIST;
}
LOG_INFO("start DeleteTable %{public}s", key.c_str());
auto status = storeManager_->CloseKvStore(delegates_.at(key));
@@ -216,7 +186,7 @@ uint32_t FlatObjectStorageEngine::DeleteTable(const std::string &key)
"FlatObjectStorageEngine::CloseKvStore %{public}s CloseKvStore fail[%{public}d]", key.c_str(), status);
return ERR_CLOSE_STORAGE;
}
LOG_INFO("end DeleteTable");
LOG_INFO("DeleteTable success");
delegates_.erase(key);
return SUCCESS;
}
@@ -229,7 +199,7 @@ uint32_t FlatObjectStorageEngine::GetItem(const std::string &key, const std::str
std::unique_lock<std::shared_mutex> lock(operationMutex_);
if (delegates_.count(key) == 0) {
LOG_ERROR("FlatObjectStorageEngine::GetItem %{public}s not exist", key.c_str());
return ERR_DE_NOT_EXIST;
return ERR_DB_NOT_EXIST;
}
LOG_INFO("start Get %{public}s", key.c_str());
DistributedDB::DBStatus status = delegates_.at(key)->Get(StringUtils::StrToBytes(itemKey), value);
@@ -250,7 +220,7 @@ uint32_t FlatObjectStorageEngine::RegisterObserver(const std::string &key, std::
std::unique_lock<std::shared_mutex> lock(operationMutex_);
if (delegates_.count(key) == 0) {
LOG_INFO("FlatObjectStorageEngine::RegisterObserver %{public}s not exist", key.c_str());
return ERR_DE_NOT_EXIST;
return ERR_DB_NOT_EXIST;
}
if (observerMap_.count(key) != 0) {
LOG_INFO("FlatObjectStorageEngine::RegisterObserver observer already exist.");
@@ -279,7 +249,7 @@ uint32_t FlatObjectStorageEngine::UnRegisterObserver(const std::string &key)
std::unique_lock<std::shared_mutex> lock(operationMutex_);
if (delegates_.count(key) == 0) {
LOG_INFO("FlatObjectStorageEngine::RegisterObserver %{public}s not exist", key.c_str());
return ERR_DE_NOT_EXIST;
return ERR_DB_NOT_EXIST;
}
auto iter = observerMap_.find(key);
if (iter == observerMap_.end()) {
@@ -299,33 +269,6 @@ uint32_t FlatObjectStorageEngine::UnRegisterObserver(const std::string &key)
return SUCCESS;
}
uint32_t FlatObjectStorageEngine::ChangeKey(const std::string &oldKey, const std::string &newKey)
{
if (!isOpened_) {
LOG_ERROR("FlatObjectStorageEngine::ChangeKey kvStore has not init");
return ERR_DB_NOT_INIT;
}
std::unique_lock<std::shared_mutex> lock(operationMutex_);
if (delegates_.count(oldKey) == 0) {
LOG_INFO("FlatObjectStorageEngine::ChangeKey oldKey %{public}s not exist", oldKey.c_str());
return ERR_DE_NOT_EXIST;
}
DistributedDB::KvStoreNbDelegate *delegate = delegates_.at(oldKey);
delegates_.erase(oldKey);
delegates_.insert_or_assign(newKey, delegate);
auto iter = observerMap_.find(oldKey);
if (iter != observerMap_.end()) {
UnRegisterObserver(oldKey);
std::shared_ptr<TableWatcher> watcher = iter->second;
uint32_t status = RegisterObserver(newKey, watcher);
if (status != DistributedDB::DBStatus::OK) {
LOG_ERROR("FlatObjectStorageEngine::ChangeKey watch err %{public}d", status);
return ERR_REGISTER;
}
}
return SUCCESS;
}
void Watcher::OnChange(const DistributedDB::KvStoreChangedData &data)
{
std::vector<std::string> changedData;
@@ -96,7 +96,7 @@ uint32_t FlatObjectStore::Put(const std::string &sessionId, const std::string &k
LOG_ERROR("FlatObjectStore::DB has not inited");
return ERR_DB_NOT_INIT;
}
return storageEngine_->UpdateItem(sessionId, FIELDS_PREFIX + key, value);
return storageEngine_->UpdateItem(sessionId, key, value);
}
uint32_t FlatObjectStore::Get(std::string &sessionId, const std::string &key, Bytes &value)
@@ -105,6 +105,6 @@ uint32_t FlatObjectStore::Get(std::string &sessionId, const std::string &key, By
LOG_ERROR("FlatObjectStore::DB has not inited");
return ERR_DB_NOT_INIT;
}
return storageEngine_->GetItem(sessionId, FIELDS_PREFIX + key, value);
return storageEngine_->GetItem(sessionId, key, value);
}
} // namespace OHOS::ObjectStore
@@ -143,9 +143,6 @@ void SoftBusAdapter::Init()
{
LOG_INFO("begin");
std::thread th = std::thread([&]() {
auto communicator = std::make_shared<ProcessCommunicatorImpl>();
auto retcom = DistributedDB::KvStoreDelegateManager::SetProcessCommunicator(communicator);
LOG_INFO("set communicator ret:%{public}d.", static_cast<int>(retcom));
int i = 0;
constexpr int RETRY_TIMES = 300;
while (i++ < RETRY_TIMES) {
@@ -20,17 +20,15 @@
#include "js_native_api.h"
#include "node_api.h"
namespace OHOS::ObjectStore {
constexpr size_t SESSION_ID_SIZE = 32;
class JSDistributedObjectStore {
public:
static napi_value JSCreateObjectSync(napi_env env, napi_callback_info info);
static napi_value JSDestroyObjectSync(napi_env env, napi_callback_info info);
static napi_value JSSync(napi_env env, napi_callback_info info);
static napi_value JSOn(napi_env env, napi_callback_info info);
static napi_value JSOff(napi_env env, napi_callback_info info);
static std::string GetBundleName(napi_env env);
private:
static std::string GetBundleName(napi_env env);
static napi_value NewDistributedObject(
napi_env env, DistributedObjectStore *objectStore, DistributedObject *object);
};
@@ -21,58 +21,98 @@
#include "napi/native_api.h"
#include "napi/native_node_api.h"
#include "uv_queue.h"
namespace OHOS::ObjectStore {
enum Event {
EVENT_UNKNOWN = -1,
EVENT_CHANGE,
EVENT_STATUS
};
class JSWatcher;
struct EventHandler {
napi_ref callbackRef = nullptr;
EventHandler *next = nullptr;
};
class EventListener {
public:
EventListener() : type_(nullptr), handlers_(nullptr)
EventListener() : handlers_(nullptr)
{
}
virtual ~EventListener()
{
}
bool Add(napi_env env, napi_value handler);
bool Del(napi_env env, napi_value handler);
void Clear(napi_env env);
const char *type_;
virtual bool Add(napi_env env, napi_value handler);
virtual bool Del(napi_env env, napi_value handler);
virtual void Clear(napi_env env);
EventHandler *Find(napi_env env, napi_value handler);
EventHandler *handlers_;
};
class ChangeEventListener : public EventListener {
public:
ChangeEventListener(JSWatcher *watcher, DistributedObjectStore *objectStore, DistributedObject *object);
bool Add(napi_env env, napi_value handler) override;
bool Del(napi_env env, napi_value handler) override;
void Clear(napi_env env) override;
private:
EventHandler *Find(napi_env env, napi_value handler);
bool isWatched_ = false;
DistributedObjectStore *objectStore_;
DistributedObject *object_;
JSWatcher *watcher_;
};
class StatusEventListener : public EventListener {
public:
StatusEventListener(JSWatcher *watcher, DistributedObjectStore *objectStore, DistributedObject *object);
bool Add(napi_env env, napi_value handler) override;
bool Del(napi_env env, napi_value handler) override;
void Clear(napi_env env) override;
private:
// bool isWatched_ = false;
// DistributedObjectStore *objectStore_;
// DistributedObject *object_;
// JSWatcher *watcher_;
};
class JSWatcher : public UvQueue {
public:
JSWatcher(const napi_env env, DistributedObjectStore *objectStore, DistributedObject *object);
~JSWatcher();
void On(const char *type, napi_value handler);
void Off(const char *type, napi_value handler = nullptr);
void Emit(const char *type, const std::string &sessionId, const std::vector<std::string> &changeData);
Event Find(const char *type) const;
private:
EventListener *Find(const char *type);
napi_env env_;
EventListener listeners_[3];
DistributedObjectStore *objectStore_;
DistributedObject *object_;
ChangeEventListener *changeEventListener_;
StatusEventListener *statusEventListener_;
};
class WatcherImpl : public ObjectWatcher, FlatObjectWatcher {
class WatcherImpl
: public ObjectWatcher
, FlatObjectWatcher {
public:
WatcherImpl(JSWatcher *watcher, const std::string &sessionId) : FlatObjectWatcher(sessionId), watcher_(watcher)
{
}
virtual ~WatcherImpl();
void OnChanged(const std::string &sessionid, const std::vector<std::string> &changedData) override;
void OnDeleted(const std::string &sessionid) override;
private:
JSWatcher *watcher_ = nullptr;
@@ -24,8 +24,6 @@
namespace OHOS::ObjectStore {
class UvQueue {
using NapiArgsGenerator = std::function<void(napi_env env, napi_ref &fuc, int &argc, napi_value *argv)>;
public:
UvQueue(napi_env env);
virtual ~UvQueue();
@@ -15,14 +15,13 @@
#include "js_distributedobjectstore.h"
#include "ability.h"
#include "ability_context.h"
#include "distributed_objectstore.h"
#include "js_common.h"
#include "js_distributedobject.h"
#include "js_object_wrapper.h"
#include "js_util.h"
#include "logger.h"
#include "napi/native_node_api.h"
#include "objectstore_errors.h"
namespace OHOS::ObjectStore {
@@ -106,32 +105,6 @@ napi_value JSDistributedObjectStore::JSDestroyObjectSync(napi_env env, napi_call
return result;
}
// function sync(object_: DistributedObject): number;
napi_value JSDistributedObjectStore::JSSync(napi_env env, napi_callback_info info)
{
LOG_INFO("start");
size_t requireArgc = 1;
size_t argc = 1;
napi_value argv[1] = { 0 };
napi_value thisVar = nullptr;
void *data = nullptr;
JSObjectWrapper *objectWrapper = nullptr;
napi_status status = napi_get_cb_info(env, info, &argc, argv, &thisVar, &data);
CHECK_EQUAL_WITH_RETURN_NULL(status, napi_ok);
ASSERT_MATCH_ELSE_RETURN_NULL(argc >= requireArgc);
status = napi_unwrap(env, argv[0], (void **)&objectWrapper);
CHECK_EQUAL_WITH_RETURN_NULL(status, napi_ok);
ASSERT_MATCH_ELSE_RETURN_NULL(objectWrapper != nullptr);
DistributedObjectStore *objectInfo =
DistributedObjectStore::GetInstance(JSDistributedObjectStore::GetBundleName(env));
ASSERT_MATCH_ELSE_RETURN_NULL(objectInfo != nullptr);
uint32_t ret = objectInfo->Sync(objectWrapper->GetObject());
napi_value result = nullptr;
napi_create_int32(env, ret, &result);
return result;
}
// function on(type: 'change', object: DistributedObject, callback: Callback<ChangedDataObserver>): void;
// function on(type: 'status', object: DistributedObject, callback: Callback<ObjectStatusObserver>): void;
napi_value JSDistributedObjectStore::JSOn(napi_env env, napi_callback_info info)
@@ -225,26 +198,6 @@ napi_value JSDistributedObjectStore::JSOff(napi_env env, napi_callback_info info
std::string JSDistributedObjectStore::GetBundleName(napi_env env)
{
napi_value global = nullptr;
napi_status status = napi_get_global(env, &global);
if (status != napi_ok || global == nullptr) {
LOG_ERROR("Cannot get global instance for %{public}d", status);
return std::string();
}
napi_value abilityContext = nullptr;
status = napi_get_named_property(env, global, "ability", &abilityContext);
if (status != napi_ok || abilityContext == nullptr) {
LOG_ERROR("Cannot get ability context for %{public}d", status);
return std::string();
}
AppExecFwk::Ability *ability = nullptr;
status = napi_get_value_external(env, abilityContext, (void **)&ability);
if (status != napi_ok || ability == nullptr) {
LOG_ERROR("Get ability form property failed for %{public}d", status);
return std::string();
}
return ability->GetBundleName();
return AbilityRuntime::Context::GetApplicationContext()->GetBundleName();
}
} // namespace OHOS::ObjectStore
@@ -34,7 +34,6 @@ static napi_value DistributedDataObjectExport(napi_env env, napi_value exports)
static napi_property_descriptor desc[] = {
DECLARE_NAPI_FUNCTION("createObjectSync", JSDistributedObjectStore::JSCreateObjectSync),
DECLARE_NAPI_FUNCTION("destroyObjectSync", JSDistributedObjectStore::JSDestroyObjectSync),
DECLARE_NAPI_FUNCTION("sync", JSDistributedObjectStore::JSSync),
DECLARE_NAPI_FUNCTION("on", JSDistributedObjectStore::JSOn),
DECLARE_NAPI_FUNCTION("off", JSDistributedObjectStore::JSOff),
};
@@ -26,63 +26,42 @@
namespace OHOS::ObjectStore {
JSWatcher::JSWatcher(const napi_env env, DistributedObjectStore *objectStore, DistributedObject *object)
: UvQueue(env), env_(env), objectStore_(objectStore), object_(object)
: UvQueue(env), env_(env)
{
listeners_[EVENT_CHANGE].type_ = "change";
listeners_[EVENT_STATUS].type_ = "status";
changeEventListener_ = new ChangeEventListener(this, objectStore, object);
statusEventListener_ = new StatusEventListener(this, objectStore, object);
}
JSWatcher::~JSWatcher()
{
listeners_[EVENT_CHANGE].Clear(env_);
listeners_[EVENT_STATUS].Clear(env_);
if (objectStore_ != nullptr) {
objectStore_->UnWatch(object_);
}
delete changeEventListener_;
delete statusEventListener_;
changeEventListener_ = nullptr;
statusEventListener_ = nullptr;
}
void JSWatcher::On(const char *type, napi_value handler)
{
Event event = Find(type);
if (event == EVENT_UNKNOWN) {
EventListener *listener = Find(type);
if (listener == nullptr) {
LOG_ERROR("error type %{public}s", type);
return;
}
LOG_ERROR("add %{public}p", handler);
bool isEmpty = listeners_[event].Add(env_, handler);
if (isEmpty && event == EVENT_CHANGE) {
std::shared_ptr<WatcherImpl> watcher = std::make_shared<WatcherImpl>(this, object_->GetSessionId());
uint32_t ret = objectStore_->Watch(object_, watcher);
if (ret != SUCCESS) {
LOG_ERROR("watch %{public}s error", object_->GetSessionId().c_str());
} else {
LOG_INFO("watch %{public}s success", object_->GetSessionId().c_str());
}
}
listener->Add(env_, handler);
}
void JSWatcher::Off(const char *type, napi_value handler)
{
Event event = Find(type);
if (event == EVENT_UNKNOWN) {
EventListener *listener = Find(type);
if (listener == nullptr) {
LOG_ERROR("error type %{public}s", type);
return;
}
LOG_INFO("start del %{public}s %{public}p", object_->GetSessionId().c_str(), handler);
bool isEmpty = true;
if (handler == nullptr) {
listeners_[event].Clear(env_);
listener->Clear(env_);
} else {
isEmpty = listeners_[event].Del(env_, handler);
listener->Del(env_, handler);
}
if (isEmpty && event == EVENT_CHANGE) {
std::shared_ptr<WatcherImpl> watcher = std::make_shared<WatcherImpl>(this, object_->GetSessionId());
uint32_t ret = objectStore_->UnWatch(object_);
if (ret != SUCCESS) {
LOG_ERROR("unWatch %{public}s error", object_->GetSessionId().c_str());
} else {
LOG_INFO("unWatch %{public}s success", object_->GetSessionId().c_str());
}
}
LOG_INFO("end %{public}s", object_->GetSessionId().c_str());
}
void JSWatcher::Emit(const char *type, const std::string &sessionId, const std::vector<std::string> &changeData)
@@ -92,25 +71,24 @@ void JSWatcher::Emit(const char *type, const std::string &sessionId, const std::
return;
}
LOG_ERROR("start %{public}s, %{public}s", sessionId.c_str(), changeData.at(0).c_str());
Event event = Find(type);
if (event == EVENT_UNKNOWN) {
LOG_ERROR("unknow %{public}s", type);
EventListener *listener = Find(type);
if (listener == nullptr) {
LOG_ERROR("error type %{public}s", type);
return;
}
for (EventHandler *handler = listeners_[event].handlers_; handler != nullptr; handler = handler->next) {
for (EventHandler *handler = listener->handlers_; handler != nullptr; handler = handler->next) {
CallFunction(sessionId, changeData, handler->callbackRef);
}
}
Event JSWatcher::Find(const char *type) const
EventListener *JSWatcher::Find(const char *type)
{
Event result = EVENT_UNKNOWN;
if (!strcmp(listeners_[EVENT_CHANGE].type_, type)) {
result = EVENT_CHANGE;
} else if (!strcmp(listeners_[EVENT_STATUS].type_, type)) {
result = EVENT_STATUS;
if (!strcmp("change", type)) {
return changeEventListener_;
}
return result;
if (!strcmp("status", type)) {
return statusEventListener_;
}
return nullptr;
}
EventHandler *EventListener::Find(napi_env env, napi_value handler)
@@ -187,11 +165,75 @@ void WatcherImpl::OnChanged(const std::string &sessionid, const std::vector<std:
watcher_->Emit("change", sessionid, changedData);
}
void WatcherImpl::OnDeleted(const std::string &sessionid)
{
}
WatcherImpl::~WatcherImpl()
{
}
bool ChangeEventListener::Add(napi_env env, napi_value handler)
{
if (!isWatched_ && object_ != nullptr) {
std::shared_ptr<WatcherImpl> watcher = std::make_shared<WatcherImpl>(watcher_, object_->GetSessionId());
uint32_t ret = objectStore_->Watch(object_, watcher);
if (ret != SUCCESS) {
LOG_ERROR("Watch %{public}s error", object_->GetSessionId().c_str());
} else {
LOG_INFO("Watch %{public}s success", object_->GetSessionId().c_str());
isWatched_ = true;
}
}
return EventListener::Add(env, handler);
}
bool ChangeEventListener::Del(napi_env env, napi_value handler)
{
bool isEmpty = EventListener::Del(env, handler);
if (isEmpty && isWatched_ && object_ != nullptr) {
uint32_t ret = objectStore_->UnWatch(object_);
if (ret != SUCCESS) {
LOG_ERROR("UnWatch %{public}s error", object_->GetSessionId().c_str());
} else {
LOG_INFO("UnWatch %{public}s success", object_->GetSessionId().c_str());
isWatched_ = false;
}
}
return isEmpty;
}
void ChangeEventListener::Clear(napi_env env)
{
EventListener::Clear(env);
if (isWatched_ && object_ != nullptr) {
uint32_t ret = objectStore_->UnWatch(object_);
if (ret != SUCCESS) {
LOG_ERROR("UnWatch %{public}s error", object_->GetSessionId().c_str());
} else {
LOG_INFO("UnWatch %{public}s success", object_->GetSessionId().c_str());
isWatched_ = false;
}
}
}
ChangeEventListener::ChangeEventListener(JSWatcher *watcher, DistributedObjectStore *objectStore, DistributedObject *object)
: objectStore_(objectStore), object_(object), watcher_(watcher)
{
}
StatusEventListener::StatusEventListener(JSWatcher *watcher, DistributedObjectStore *objectStore, DistributedObject *object)
{
}
bool StatusEventListener::Add(napi_env env, napi_value handler)
{
return EventListener::Add(env, handler);
}
bool StatusEventListener::Del(napi_env env, napi_value handler)
{
return EventListener::Del(env, handler);
}
void StatusEventListener::Clear(napi_env env)
{
EventListener::Clear(env);
}
} // namespace OHOS::ObjectStore
@@ -42,7 +42,6 @@ public:
class ObjectWatcher {
public:
virtual void OnChanged(const std::string &sessionid, const std::vector<std::string> &changedData) = 0;
virtual void OnDeleted(const std::string &sessionid) = 0;
};
} // namespace OHOS::ObjectStore
#endif // DISTRIBUTED_OBJECT_H
@@ -28,10 +28,11 @@ public:
static DistributedObjectStore *GetInstance(const std::string &bundleName);
virtual DistributedObject *CreateObject(const std::string &sessionId) = 0;
virtual uint32_t Get(const std::string &sessionId, DistributedObject *object) = 0;
virtual uint32_t Sync(DistributedObject *object) = 0;
virtual uint32_t DeleteObject(const std::string &sessionId) = 0;
virtual uint32_t Watch(DistributedObject *object, std::shared_ptr<ObjectWatcher> objectWatcher) = 0;
virtual uint32_t UnWatch(DistributedObject *object) = 0;
virtual void TriggerSync();
virtual void TriggerRestore(std::function<void()> notifier);
};
} // namespace OHOS::ObjectStore
+4 -3
View File
@@ -22,12 +22,13 @@ constexpr uint32_t BASE_ERR_OFFSET = 1650;
/* module defined errors */
constexpr uint32_t SUCCESS = 0;
constexpr uint32_t ERR_DB_SET_PROCESS = BASE_ERR_OFFSET + 1;
constexpr uint32_t ERR_EXIST = BASE_ERR_OFFSET + 2;
constexpr uint32_t ERR_DATA_LEN = BASE_ERR_OFFSET + 3;
constexpr uint32_t ERR_MOMEM = BASE_ERR_OFFSET + 4;
constexpr uint32_t ERR_NOMEM = BASE_ERR_OFFSET + 4;
constexpr uint32_t ERR_DB_NOT_INIT = BASE_ERR_OFFSET + 5;
constexpr uint32_t ERR_DE_GETKV_FAIL = BASE_ERR_OFFSET + 6;
constexpr uint32_t ERR_DE_NOT_EXIST = BASE_ERR_OFFSET + 7;
constexpr uint32_t ERR_DB_GETKV_FAIL = BASE_ERR_OFFSET + 6;
constexpr uint32_t ERR_DB_NOT_EXIST = BASE_ERR_OFFSET + 7;
constexpr uint32_t ERR_DB_GET_FAIL = BASE_ERR_OFFSET + 8;
constexpr uint32_t ERR_DB_ENTRY_FAIL = BASE_ERR_OFFSET + 9;
constexpr uint32_t ERR_CLOSE_STORAGE = BASE_ERR_OFFSET + 10;
+4 -7
View File
@@ -57,6 +57,7 @@ config("objectstore_config") {
"../../frameworks/innerkitsimpl/include/communicator",
"../../interfaces/innerkits",
"//third_party/bounds_checking_function/include",
"//foundation/aafwk/standard/frameworks/kits/appkit/native/ability_runtime/context",
]
}
@@ -107,7 +108,7 @@ ohos_shared_library("distributeddataobject") {
":distributed_data_object_abc",
":distributed_data_object_js",
"//foundation/aafwk/standard/frameworks/kits/ability/native:abilitykit_native",
"//foundation/ace/napi:ace_napi",
"//foundation/aafwk/standard/interfaces/innerkits/want:want",
"//foundation/distributeddatamgr/distributeddatamgr/services/distributeddataservice/libs/distributeddb:distributeddb",
"//foundation/distributeddatamgr/objectstore/interfaces/innerkits:distributeddataobject_impl",
"//third_party/libuv:uv_static",
@@ -116,14 +117,10 @@ ohos_shared_library("distributeddataobject") {
]
external_deps = [
"ability_runtime:ability_manager",
"ability_runtime:app_manager",
"ability_runtime:want",
"bundle_framework:appexecfwk_base",
"bundle_framework:appexecfwk_core",
"ability_runtime:runtime",
"dsoftbus_standard:softbus_client",
"hiviewdfx_hilog_native:libhilog",
"native_appdatamgr:native_appdatafwk",
"native_appdatamgr:native_rdb",
"napi:ace_napi",
]
public_configs = [ ":objectstore_public_config" ]
+7 -5
View File
@@ -26,6 +26,7 @@ class Distributed {
this.__proxy[SESSION_ID] = newValue;
}
});
console.info("constructor success ");
};
setSessionId(sessionId) {
@@ -53,7 +54,6 @@ class Distributed {
off(type, callback) {
offWatch(type, this.__proxy, callback);
};
__proxy;
}
@@ -97,7 +97,9 @@ function joinSession(obj, sessionId) {
console.info("end set " + key + " " + newValue);
}
});
object[key] = obj[key];
if (obj[key] != undefined) {
object[key] = obj[key];
}
});
Object.defineProperty(object, SESSION_ID, {
@@ -113,16 +115,16 @@ function leaveSession(obj) {
console.warn("object is null");
return;
}
// disconnect,delete object
distributedObject.destroyObjectSync(obj);
Object.keys(obj).forEach(key => {
Object.defineProperty(obj, key, {
value: obj[key],
value: obj.get(key),
configurable: true,
writable: true,
enumerable: true,
});
});
// disconnect,delete object
distributedObject.destroyObjectSync(obj);
delete obj[SESSION_ID];
}