!631 对于超过 200k 的 RdbChangeNode 数据,改为使用共享内存传输,避免使用 IPC 传输

Merge pull request !631 from 中饭吃啥/master
This commit is contained in:
openharmony_ci 2024-11-17 09:18:14 +00:00 committed by Gitee
commit 3962601207
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
6 changed files with 425 additions and 6 deletions

View File

@ -134,13 +134,37 @@ bool Unmarshalling(PredicateTemplateNode &predicateTemplateNode, MessageParcel &
template<>
bool Marshalling(const RdbChangeNode &changeNode, MessageParcel &parcel)
{
return ITypesUtil::Marshal(parcel, changeNode.uri_, changeNode.templateId_, changeNode.data_);
bool firstPart = ITypesUtil::Marshal(
parcel, changeNode.uri_, changeNode.templateId_, changeNode.data_, changeNode.isSharedMemory_);
if (!firstPart) {
return false;
}
if (changeNode.isSharedMemory_) {
if (changeNode.memory_ == nullptr) {
LOG_ERROR("Used shared memory but ashmem is nullptr.");
return false;
}
if (!parcel.WriteAshmem(changeNode.memory_)) {
return false;
}
}
return ITypesUtil::Marshal(parcel, changeNode.size_);
}
template<>
bool Unmarshalling(RdbChangeNode &changeNode, MessageParcel &parcel)
{
return ITypesUtil::Unmarshal(parcel, changeNode.uri_, changeNode.templateId_, changeNode.data_);
bool firstPart = ITypesUtil::Unmarshal(
parcel, changeNode.uri_, changeNode.templateId_, changeNode.data_, changeNode.isSharedMemory_);
if (!firstPart) {
return false;
}
if (changeNode.isSharedMemory_) {
changeNode.memory_ = parcel.ReadAshmem();
} else {
changeNode.memory_ = nullptr;
}
return ITypesUtil::Unmarshal(parcel, changeNode.size_);
}
template<>

View File

@ -28,9 +28,12 @@ public:
RdbObserverStub(RdbCallback callback);
virtual ~RdbObserverStub();
int OnRemoteRequest(uint32_t code, MessageParcel &data, MessageParcel &reply, MessageOption &option) override;
void OnChangeFromRdb(const RdbChangeNode &changeNode);
void OnChangeFromRdb(RdbChangeNode &changeNode);
void ClearCallback();
private:
int RecoverRdbChangeNodeData(RdbChangeNode &changeNode);
int DeserializeDataFromAshmem(RdbChangeNode &changeNode);
int ReadAshmem(RdbChangeNode &changeNode, const void **data, int size, int &offset);
std::mutex mutex_;
RdbCallback callback_;
};

View File

@ -42,9 +42,97 @@ int RdbObserverStub::OnRemoteRequest(uint32_t code, MessageParcel &data, Message
return ERR_OK;
}
void RdbObserverStub::OnChangeFromRdb(const RdbChangeNode &changeNode)
int RdbObserverStub::ReadAshmem(RdbChangeNode &changeNode, const void **data, int size, int &offset)
{
if (changeNode.memory_ == nullptr) {
LOG_ERROR("changeNode memory is nullptr.");
return E_ERROR;
}
const void *read = changeNode.memory_->ReadFromAshmem(size, offset);
if (read == nullptr) {
LOG_ERROR("failed to read from ashmem.");
changeNode.memory_->UnmapAshmem();
changeNode.memory_->CloseAshmem();
changeNode.memory_ = nullptr;
return E_ERROR;
}
*data = read;
offset += size;
return E_OK;
}
int RdbObserverStub::DeserializeDataFromAshmem(RdbChangeNode &changeNode)
{
if (changeNode.memory_ == nullptr) {
LOG_ERROR("changeNode.memory_ is null.");
return E_ERROR;
}
bool mapRet = changeNode.memory_->MapReadAndWriteAshmem();
if (!mapRet) {
LOG_ERROR("failed to map read and write ashmem, ret=%{public}d", mapRet);
changeNode.memory_->CloseAshmem();
changeNode.memory_ = nullptr;
return E_ERROR;
}
LOG_DEBUG("receive data size: %{public}d", changeNode.size_);
// Read data size
int intLen = 4;
int offset = 0;
const int *vecLenRead;
if (ReadAshmem(changeNode, (const void **)&vecLenRead, intLen, offset) != E_OK) {
LOG_ERROR("failed to read data with len %{public}d, offset %{public}d.", intLen, offset);
return E_ERROR;
}
int vecLen = *vecLenRead;
// Read data
for (int i = 0; i < vecLen; i++) {
const int *dataLenRead;
if (ReadAshmem(changeNode, (const void **)&dataLenRead, intLen, offset) != E_OK) {
LOG_ERROR(
"failed to read data with index %{public}d, len %{public}d, offset %{public}d.", i, intLen, offset);
return E_ERROR;
}
int dataLen = *dataLenRead;
const char *dataRead;
if (ReadAshmem(changeNode, (const void **)&dataRead, dataLen, offset) != E_OK) {
LOG_ERROR(
"failed to read data with index %{public}d, len %{public}d, offset %{public}d.", i, dataLen, offset);
return E_ERROR;
}
std::string data(dataRead, dataLen);
changeNode.data_.push_back(data);
}
return E_OK;
}
int RdbObserverStub::RecoverRdbChangeNodeData(RdbChangeNode &changeNode)
{
int ret = E_OK;
if (changeNode.isSharedMemory_) {
// Recover form Ashmem
if (DeserializeDataFromAshmem(changeNode) != E_OK) {
LOG_ERROR("failed to deserialize data from ashmem.");
ret = E_ERROR;
}
if (changeNode.memory_ != nullptr) {
changeNode.memory_->UnmapAshmem();
changeNode.memory_->CloseAshmem();
changeNode.memory_ = nullptr;
}
changeNode.isSharedMemory_ = false;
changeNode.size_ = 0;
}
return ret;
}
void RdbObserverStub::OnChangeFromRdb(RdbChangeNode &changeNode)
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
if (RecoverRdbChangeNodeData(changeNode) != E_OK) {
LOG_ERROR("failed to recover RdbChangeNode data.");
return;
}
if (callback_) {
callback_(changeNode);
}

View File

@ -23,6 +23,19 @@
namespace OHOS {
namespace DataShare {
/**
* Specifies the upper limit of size of data that RdbChangeNode will transfer by IPC. Currently it's 200k.
*/
constexpr int32_t DATA_SIZE_IPC_TRANSFER_LIMIT = 200 << 10;
/**
* Specifies the upper limit of size of data that RdbChangeNode will transfer by the shared memory. Currently it's 10M.
*/
constexpr int32_t DATA_SIZE_ASHMEM_TRANSFER_LIMIT = (10 << 10) << 10;
/**
* Specifies the name of the shared memory that RdbChangeNode will transfer.
*/
constexpr const char* ASHMEM_NAME = "DataShareRdbChangeNode";
/**
* Specifies the predicates structure of the template.
*/
@ -152,6 +165,16 @@ struct RdbChangeNode {
TemplateId templateId_;
/** Specifies the datas of the callback. */
std::vector<std::string> data_;
/** Specifies whether to use the shared meomry to transfer data. This will be set to be true when the size of
* the data is more than 200k, but no more than 10M. Usually the data will not be as large as 10M.
*/
bool isSharedMemory_ = false;
/** Specifies the address of the shared memory, wrapped by `OHOS::sptr<Ashmem>`.
* (De)serialization: [vec_size(int32); str1_len(int32), str1; str2_len(int32), str2; ...]
*/
OHOS::sptr<Ashmem> memory_;
/** Specifies the data size transferred the shared memory */
int32_t size_;
};
/**

View File

@ -52,6 +52,8 @@ config("permission_config") {
ohos_unittest("NativeDataShareTest") {
module_out_path = "data_share/native_datashare"
visibility = [ ":*" ]
include_dirs = [
"//foundation/ability/ability_runtime/interfaces/inner_api/ability_manager/include",
"//foundation/ability/ability_runtime/interfaces/inner_api/app_manager/include/appmgr",
@ -65,10 +67,13 @@ ohos_unittest("NativeDataShareTest") {
"//utils/system/safwk/native/include",
"//foundation/communication/ipc/interfaces/innerkits/ipc_core/include",
"//third_party/json/include",
"${datashare_native_proxy_path}/include",
]
sources =
[ "./unittest/mediadatashare_test/src/mediadatashare_unit_test.cpp" ]
sources = [
"${datashare_native_proxy_path}/src/data_proxy_observer_stub.cpp",
"./unittest/mediadatashare_test/src/mediadatashare_unit_test.cpp",
]
deps = [
"${datashare_innerapi_path}:datashare_consumer",
@ -93,6 +98,12 @@ ohos_unittest("NativeDataShareTest") {
"safwk:system_ability_fwk",
"samgr:samgr_proxy",
]
cflags = [
"-fvisibility=hidden",
"-Dprivate=public",
"-Dprotected=public",
]
}
ohos_unittest("PermissionTest") {

View File

@ -23,6 +23,7 @@
#include "dataobs_mgr_changeinfo.h"
#include "datashare_log.h"
#include "datashare_valuebucket_convert.h"
#include "data_proxy_observer_stub.h"
#include "hap_token_info.h"
#include "iservice_registry.h"
#include "rdb_data_ability_utils.h"
@ -1673,5 +1674,274 @@ HWTEST_F(MediaDataShareUnitTest, MediaDataShare_BatchUpdateThanLimit_Test_001, T
EXPECT_EQ(results.size(), 0);
LOG_INFO("MediaDataShare_BatchUpdateThanLimit_Test_001 End");
}
void OnChangeCallback(const RdbChangeNode &changeNode)
{
// In test, put 2 uris into the data vec
int vecLen = 2;
EXPECT_EQ(changeNode.data_.size(), vecLen);
for (int i = 0; i < vecLen; i++) {
EXPECT_EQ(changeNode.data_[i], DATA_SHARE_URI);
}
}
void PrepareNodeContent(RdbChangeNode &node)
{
OHOS::sptr<Ashmem> memory = Ashmem::CreateAshmem("PrepareNodeContent", DATA_SIZE_ASHMEM_TRANSFER_LIMIT);
EXPECT_NE(memory, nullptr);
bool mapRet = memory->MapReadAndWriteAshmem();
ASSERT_TRUE(mapRet);
// write 2 uris
int vecLen = 2;
int intByteLen = 4;
int offset = 0;
bool writeRet = memory->WriteToAshmem((void*)&vecLen, intByteLen, offset);
ASSERT_TRUE(writeRet);
offset += intByteLen;
int len = DATA_SHARE_URI.length();
const char *str = DATA_SHARE_URI.c_str();
for (int i = 0; i < vecLen; i++) {
writeRet = memory->WriteToAshmem((void*)&len, intByteLen, offset);
ASSERT_TRUE(writeRet);
offset += intByteLen;
writeRet = memory->WriteToAshmem((void*)str, len, offset);
ASSERT_TRUE(writeRet);
offset += len;
}
node.memory_ = memory;
node.size_ = offset;
node.isSharedMemory_ = true;
}
/**
* @tc.name: ReadAshmem
* @tc.desc: test ReadAshmem function.
* @tc.type: FUNC
* @tc.require:
*/
HWTEST_F(MediaDataShareUnitTest, ReadAshmem, TestSize.Level1)
{
LOG_INFO("ReadAshmem starts");
RdbChangeNode node;
OHOS::sptr<Ashmem> memory = Ashmem::CreateAshmem("ReadAshmem", DATA_SIZE_ASHMEM_TRANSFER_LIMIT);
EXPECT_NE(memory, nullptr);
bool mapRet = memory->MapReadAndWriteAshmem();
ASSERT_TRUE(mapRet);
int len = DATA_SHARE_URI.length();
bool writeRet = memory->WriteToAshmem((void*)&len, 4, 0);
ASSERT_TRUE(writeRet);
const char *str = DATA_SHARE_URI.c_str();
writeRet = memory->WriteToAshmem((void*)str, len, 4);
ASSERT_TRUE(writeRet);
node.memory_ = memory;
RdbObserverStub stub(OnChangeCallback);
// Read an int
const int *lenRead;
int offset = 0;
int readRet = stub.ReadAshmem(node, (const void**)&lenRead, 4, offset);
EXPECT_EQ(readRet, E_OK);
EXPECT_EQ(offset, 4);
int lenFromAshmem = *lenRead;
EXPECT_EQ(lenFromAshmem, len);
// Read a string
readRet = stub.ReadAshmem(node, (const void**)&str, lenFromAshmem, offset);
EXPECT_EQ(readRet, E_OK);
EXPECT_EQ(offset, 4 + len);
std::string strRead(str, lenFromAshmem);
EXPECT_EQ(strRead, DATA_SHARE_URI);
// Error path test
readRet = stub.ReadAshmem(node, (const void**)&str, DATA_SIZE_ASHMEM_TRANSFER_LIMIT, offset);
EXPECT_EQ(readRet, E_ERROR);
LOG_INFO("ReadAshmem ends");
}
/**
* @tc.name: DeserializeDataFromAshmem001
* @tc.desc: test DeserializeDataFromAshmem function.
* @tc.type: FUNC
* @tc.require:
*/
HWTEST_F(MediaDataShareUnitTest, DeserializeDataFromAshmem001, TestSize.Level1)
{
LOG_INFO("DeserializeDataFromAshmem001::Start");
RdbChangeNode node;
PrepareNodeContent(node);
RdbObserverStub stub(OnChangeCallback);
int readRet = stub.DeserializeDataFromAshmem(node);
EXPECT_EQ(readRet, E_OK);
EXPECT_EQ(node.data_.size(), 2);
for (int i = 0; i < 2; i++) {
EXPECT_EQ(node.data_[i], DATA_SHARE_URI);
}
LOG_INFO("DeserializeDataFromAshmem001::End");
}
/**
* @tc.name: DeserializeDataFromAshmem002
* @tc.desc: test DeserializeDataFromAshmem function, error tests.
* @tc.type: FUNC
* @tc.require:
*/
HWTEST_F(MediaDataShareUnitTest, DeserializeDataFromAshmem002, TestSize.Level1)
{
LOG_INFO("DeserializeDataFromAshmem002::Start");
RdbChangeNode node;
RdbObserverStub stub(OnChangeCallback);
// memory_ is null.
int ret = stub.DeserializeDataFromAshmem(node);
EXPECT_EQ(ret, E_ERROR);
// Error in read from Ashmem with error string length.
OHOS::sptr<Ashmem> memory = Ashmem::CreateAshmem("DeserializeDataFromAshmem002", DATA_SIZE_ASHMEM_TRANSFER_LIMIT);
EXPECT_NE(memory, nullptr);
bool mapRet = memory->MapReadAndWriteAshmem();
ASSERT_TRUE(mapRet);
int vecLen = 1;
int offset = 0;
bool writeRet = memory->WriteToAshmem((void*)&vecLen, 4, offset);
ASSERT_TRUE(writeRet);
offset += 4;
int len = DATA_SHARE_URI.length();
int errorLen = DATA_SIZE_ASHMEM_TRANSFER_LIMIT;
const char *str = DATA_SHARE_URI.c_str();
writeRet = memory->WriteToAshmem((void*)&errorLen, 4, offset);
ASSERT_TRUE(writeRet);
offset += 4;
writeRet = memory->WriteToAshmem((void*)str, len, offset);
ASSERT_TRUE(writeRet);
node.memory_ = memory;
ret = stub.DeserializeDataFromAshmem(node);
EXPECT_EQ(ret, E_ERROR);
// Error in read from Ashmem with vec size
OHOS::sptr<Ashmem> memory2 = Ashmem::CreateAshmem("DeserializeDataFromAshmem002", 2);
EXPECT_NE(memory2, nullptr);
mapRet = memory2->MapReadAndWriteAshmem();
ASSERT_TRUE(mapRet);
node.memory_ = memory2;
ret = stub.DeserializeDataFromAshmem(node);
EXPECT_EQ(ret, E_ERROR);
// Error in read from Ashmem with str size
OHOS::sptr<Ashmem> memory3 = Ashmem::CreateAshmem("DeserializeDataFromAshmem002", 5);
EXPECT_NE(memory3, nullptr);
mapRet = memory3->MapReadAndWriteAshmem();
ASSERT_TRUE(mapRet);
writeRet = memory3->WriteToAshmem((void*)&vecLen, 4, 0);
ASSERT_TRUE(writeRet);
node.memory_ = memory3;
ret = stub.DeserializeDataFromAshmem(node);
EXPECT_EQ(ret, E_ERROR);
LOG_INFO("DeserializeDataFromAshmem002::End");
}
/**
* @tc.name: RecoverRdbChangeNodeData001
* @tc.desc: test RecoverRdbChangeNodeData function
* @tc.type: FUNC
* @tc.require:
*/
HWTEST_F(MediaDataShareUnitTest, RecoverRdbChangeNodeData001, TestSize.Level0)
{
LOG_INFO("RecoverRdbChangeNodeData::Start");
// Recover
RdbChangeNode node;
PrepareNodeContent(node);
RdbObserverStub stub(OnChangeCallback);
int ret = stub.RecoverRdbChangeNodeData(node);
EXPECT_EQ(ret, E_OK);
EXPECT_EQ(node.data_.size(), 2);
for (int i = 0; i < 2; i++) {
EXPECT_EQ(node.data_[i], DATA_SHARE_URI);
}
EXPECT_EQ(node.memory_, nullptr);
EXPECT_EQ(node.size_, 0);
ASSERT_FALSE(node.isSharedMemory_);
// Not recover
RdbChangeNode node2;
PrepareNodeContent(node2);
node2.isSharedMemory_ = false;
ret = stub.RecoverRdbChangeNodeData(node2);
EXPECT_EQ(ret, E_OK);
EXPECT_EQ(node2.data_.size(), 0);
EXPECT_NE(node2.memory_, nullptr);
EXPECT_EQ(node2.size_, 82);
ASSERT_FALSE(node2.isSharedMemory_);
LOG_INFO("RecoverRdbChangeNodeData End");
}
/**
* @tc.name: RecoverRdbChangeNodeData002
* @tc.desc: test RecoverRdbChangeNodeData function with error
* @tc.type: FUNC
* @tc.require:
*/
HWTEST_F(MediaDataShareUnitTest, RecoverRdbChangeNodeData002, TestSize.Level0)
{
LOG_INFO("RecoverRdbChangeNodeData002::Start");
RdbChangeNode node;
node.isSharedMemory_ = true;
RdbObserverStub stub(OnChangeCallback);
int ret = stub.RecoverRdbChangeNodeData(node);
EXPECT_EQ(ret, E_ERROR);
EXPECT_EQ(node.data_.size(), 0);
EXPECT_EQ(node.memory_, nullptr);
EXPECT_EQ(node.size_, 0);
ASSERT_FALSE(node.isSharedMemory_);
LOG_INFO("RecoverRdbChangeNodeData002::End");
}
/**
* @tc.name: OnChangeFromRdb001
* @tc.desc: test OnChangeFromRdb function
* @tc.type: FUNC
* @tc.require:
*/
HWTEST_F(MediaDataShareUnitTest, OnChangeFromRdb001, TestSize.Level0)
{
LOG_INFO("OnChangeFromRdb001::Start");
RdbChangeNode node;
PrepareNodeContent(node);
RdbObserverStub stub(OnChangeCallback);
stub.OnChangeFromRdb(node);
EXPECT_EQ(node.data_.size(), 2);
for (int i = 0; i < 2; i++) {
EXPECT_EQ(node.data_[i], DATA_SHARE_URI);
}
EXPECT_EQ(node.memory_, nullptr);
EXPECT_EQ(node.size_, 0);
ASSERT_FALSE(node.isSharedMemory_);
LOG_INFO("OnChangeFromRdb001::End");
}
/**
* @tc.name: OnChangeFromRdb002
* @tc.desc: test OnChangeFromRdb function with error
* @tc.type: FUNC
* @tc.require:
*/
HWTEST_F(MediaDataShareUnitTest, OnChangeFromRdb002, TestSize.Level0)
{
LOG_INFO("OnChangeFromRdb002::Start");
RdbChangeNode node;
node.isSharedMemory_ = true;
RdbObserverStub stub(OnChangeCallback);
stub.OnChangeFromRdb(node);
EXPECT_EQ(node.data_.size(), 0);
EXPECT_EQ(node.memory_, nullptr);
EXPECT_EQ(node.size_, 0);
ASSERT_FALSE(node.isSharedMemory_);
LOG_INFO("OnChangeFromRdb002::End");
}
} // namespace DataShare
} // namespace OHOS