!2378 feat:资产并行下载

Merge pull request !2378 from wTong888/master
This commit is contained in:
openharmony_ci 2024-10-29 11:40:24 +00:00 committed by Gitee
commit 405a4576cb
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
6 changed files with 189 additions and 6 deletions

View File

@ -21,6 +21,12 @@ int32_t AssetLoader::Download(
return E_NOT_SUPPORT;
}
void AssetLoader::Download(const std::string &tableName, std::vector<AssetsRecord> &assetsRecords)
{
(void)assetsRecords;
return;
}
int32_t AssetLoader::RemoveLocalAssets(
const std::string &tableName, const std::string &gid, const Value &prefix, VBucket &assets)
{

View File

@ -21,9 +21,11 @@
namespace OHOS::DistributedData {
class API_EXPORT AssetLoader {
public:
using AssetsRecord = DistributedData::AssetRecord;
virtual ~AssetLoader() = default;
virtual int32_t Download(
const std::string &tableName, const std::string &gid, const Value &prefix, VBucket &assets);
virtual void Download(const std::string &tableName, std::vector<AssetsRecord> &assetsRecords);
virtual int32_t RemoveLocalAssets(
const std::string &tableName, const std::string &gid, const Value &prefix, VBucket &assets);
virtual int32_t Cancel();

View File

@ -129,6 +129,12 @@ inline constexpr size_t TYPE_INDEX = Traits::variant_index_of_v<T, Value>;
inline constexpr size_t TYPE_MAX = Traits::variant_size_of_v<Value>;
struct AssetRecord {
std::string gid;
Value prefix;
VBucket assets;
};
enum class QueryOperation : uint32_t {
ILLEGAL = 0,
IN = 1,

View File

@ -16,9 +16,12 @@
#define LOG_TAG "RdbAssetLoader"
#include "rdb_asset_loader.h"
#include <variant>
#include "error/general_error.h"
#include "log_print.h"
#include "rdb_cloud.h"
#include "store/general_value.h"
#include "value_proxy.h"
using namespace DistributedDB;
@ -44,6 +47,75 @@ DBStatus RdbAssetLoader::Download(const std::string &tableName, const std::strin
: CLOUD_RECORD_EXIST_CONFLICT;
}
void RdbAssetLoader::BatchDownload(const std::string &tableName, std::vector<AssetRecord> &downloadAssets)
{
std::vector<AssetsRecord> assetsRecords = Convert(std::move(downloadAssets));
std::set<std::string> skipAssets;
std::set<std::string> deleteAssets;
PostEvent(skipAssets, assetsRecords, DistributedData::AssetEvent::DOWNLOAD, deleteAssets);
assetLoader_->Download(tableName, assetsRecords);
PostEvent(skipAssets, assetsRecords, DistributedData::AssetEvent::DOWNLOAD_FINISHED, deleteAssets);
downloadAssets = Convert(std::move(assetsRecords));
}
std::vector<RdbAssetLoader::AssetsRecord> RdbAssetLoader::Convert(std::vector<AssetRecord> &&downloadAssets)
{
std::vector<AssetsRecord> assetsRecords;
for (auto &assetRecord : downloadAssets) {
AssetsRecord assetsRecord;
assetsRecord.gid = std::move(assetRecord.gid);
DistributedDB::Type prefixTemp = assetRecord.prefix;
assetsRecord.prefix = ValueProxy::Convert(std::move(prefixTemp));
assetsRecord.assets = ValueProxy::Convert(std::move(assetRecord.assets));
assetsRecords.emplace_back(std::move(assetsRecord));
}
return assetsRecords;
}
void RdbAssetLoader::UpdateStatus(AssetRecord &assetRecord, const VBucket &assets)
{
for (const auto &[key, value] : assets) {
auto downloadAssets = std::get_if<DistributedData::Assets>(&value);
if (downloadAssets == nullptr) {
continue;
}
for (const auto &asset : *downloadAssets) {
if (assetRecord.status == DBStatus::OK) {
assetRecord.status = ConvertStatus(static_cast<AssetStatus>(asset.status));
return;
}
}
}
}
std::vector<IAssetLoader::AssetRecord> RdbAssetLoader::Convert(std::vector<AssetsRecord> &&assetsRecords)
{
std::vector<AssetRecord> assetRecords;
for (auto &assetsRecord : assetsRecords) {
AssetRecord assetRecord{
.gid = std::move(assetsRecord.gid),
};
UpdateStatus(assetRecord, assetsRecord.assets);
assetRecord.assets = ValueProxy::Convert(std::move(assetsRecord.assets));
assetRecords.emplace_back(std::move(assetRecord));
}
return assetRecords;
}
DBStatus RdbAssetLoader::ConvertStatus(AssetStatus error)
{
switch (error) {
case AssetStatus::STATUS_NORMAL:
return DBStatus::OK;
case AssetStatus::STATUS_DOWNLOADING:
return DBStatus::OK;
default:
ZLOGE("error:0x%{public}x", error);
break;
}
return DBStatus::CLOUD_ERROR;
}
DBStatus RdbAssetLoader::RemoveLocalAssets(const std::vector<Asset> &assets)
{
DistributedData::VBucket deleteAssets = ValueProxy::Convert(std::map<std::string, Assets>{{ "", assets }});
@ -51,6 +123,20 @@ DBStatus RdbAssetLoader::RemoveLocalAssets(const std::vector<Asset> &assets)
return RdbCloud::ConvertStatus(static_cast<DistributedData::GeneralError>(error));
}
void RdbAssetLoader::PostEvent(std::set<std::string> &skipAssets, std::vector<AssetsRecord> &assetsRecords,
DistributedData::AssetEvent eventId, std::set<std::string> &deleteAssets)
{
for (auto &assetsRecord : assetsRecords) {
for (auto &asset : assetsRecord.assets) {
auto *downLoadAssets = Traits::get_if<DistributedData::Assets>(&asset.second);
if (downLoadAssets == nullptr) {
return;
}
PostEvent(eventId, *downLoadAssets, skipAssets, deleteAssets);
}
}
}
void RdbAssetLoader::PostEvent(std::set<std::string>& skipAssets, std::map<std::string, DistributedData::Value>& assets,
DistributedData::AssetEvent eventId, std::set<std::string>& deleteAssets)
{

View File

@ -29,23 +29,37 @@ public:
using Asset = DistributedDB::Asset;
using DBStatus = DistributedDB::DBStatus;
using BindAssets = DistributedData::BindAssets;
using AssetsRecord = DistributedData::AssetRecord;
using AssetStatus = DistributedData::Asset::Status;
using GeneralError = DistributedData::GeneralError;
using VBucket = DistributedData::VBucket;
explicit RdbAssetLoader(std::shared_ptr<DistributedData::AssetLoader> cloudAssetLoader, BindAssets* bindAssets);
explicit RdbAssetLoader(std::shared_ptr<DistributedData::AssetLoader> cloudAssetLoader, BindAssets *bindAssets);
~RdbAssetLoader() = default;
DBStatus Download(const std::string &tableName, const std::string &gid, const Type &prefix,
std::map<std::string, std::vector<Asset>> &assets) override;
void BatchDownload(const std::string &tableName, std::vector<AssetRecord> &downloadAssets) override;
DBStatus RemoveLocalAssets(const std::vector<Asset> &assets) override;
private:
static std::vector<AssetRecord> Convert(std::vector<AssetsRecord> &&assetsRecords);
static std::vector<AssetsRecord> Convert(std::vector<AssetRecord> &&assetRecords);
static DBStatus ConvertStatus(AssetStatus error);
static void UpdateStatus(AssetRecord &assetRecord, const VBucket &assets);
void PostEvent(std::set<std::string> &skipAssets, std::map<std::string, DistributedData::Value> &assets,
DistributedData::AssetEvent eventId, std::set<std::string> &deleteAssets);
void PostEvent(std::set<std::string> &skipAssets, std::vector<AssetsRecord> &assetsRecords,
DistributedData::AssetEvent eventId, std::set<std::string> &deleteAssets);
void PostEvent(DistributedData::AssetEvent eventId, DistributedData::Assets &assets,
std::set<std::string> &skipAssets, std::set<std::string> &deleteAssets);
std::shared_ptr<DistributedData::AssetLoader> assetLoader_;
BindAssets* snapshots_;
void PostEvent(std::set<std::string>& skipAssets, std::map<std::string, DistributedData::Value>& assets,
DistributedData::AssetEvent eventId, std::set<std::string>& deleteAssets);
void PostEvent(DistributedData::AssetEvent eventId, DistributedData::Assets& assets,
std::set<std::string>& skipAssets, std::set<std::string>& deleteAssets);
BindAssets *snapshots_;
};
} // namespace OHOS::DistributedRdb
#endif // OHOS_DISTRIBUTED_DATA_DATAMGR_SERVICE_RDB_ASSET_LOADER_H

View File

@ -29,6 +29,7 @@ using namespace testing::ext;
using namespace OHOS::DistributedRdb;
using namespace OHOS::DistributedData;
using Type = DistributedDB::Type;
using AssetsRecord = DistributedData::AssetRecord;
const DistributedDB::Asset g_rdbAsset = { .version = 1,
.name = "Phone",
.assetId = "0",
@ -92,6 +93,74 @@ HWTEST_F(RdbAssetLoaderTest, Download, TestSize.Level0)
EXPECT_EQ(result, DistributedDB::DBStatus::OK);
}
/**
* @tc.name: BatchDownload
* @tc.desc: RdbAssetLoader batch download test.
* @tc.type: FUNC
* @tc.require:
* @tc.author:
*/
HWTEST_F(RdbAssetLoaderTest, BatchDownload, TestSize.Level0)
{
BindAssets bindAssets;
auto cloudAssetLoader = std::make_shared<AssetLoader>();
DistributedRdb::RdbAssetLoader rdbAssetLoader(cloudAssetLoader, &bindAssets);
std::string tableName = "testTable";
Type prefix;
std::map<std::string, DistributedDB::Assets> assets;
assets["asset1"].push_back(g_rdbAsset);
std::vector<DistributedDB::IAssetLoader::AssetRecord> assetRecords;
DistributedDB::IAssetLoader::AssetRecord assetRecord { .gid = "gid", .prefix = prefix, .assets = assets };
assetRecords.emplace_back(assetRecord);
rdbAssetLoader.BatchDownload(tableName, assetRecords);
ASSERT_TRUE(!assetRecords.empty());
EXPECT_EQ(assetRecords[0].status, DistributedDB::DBStatus::OK);
}
/**
* @tc.name: Convert001
* @tc.desc: Convert test.
* @tc.type: FUNC
* @tc.require:
* @tc.author:
*/
HWTEST_F(RdbAssetLoaderTest, Convert001, TestSize.Level0)
{
BindAssets bindAssets;
auto cloudAssetLoader = std::make_shared<AssetLoader>();
DistributedRdb::RdbAssetLoader rdbAssetLoader(cloudAssetLoader, &bindAssets);
Type prefix;
std::map<std::string, DistributedDB::Assets> assets;
assets["asset1"].push_back(g_rdbAsset);
std::vector<DistributedDB::IAssetLoader::AssetRecord> assetRecords;
DistributedDB::IAssetLoader::AssetRecord assetRecord { .gid = "gid", .prefix = prefix, .assets = assets };
assetRecords.emplace_back(assetRecord);
assetRecords.emplace_back(assetRecord);
auto assetsRecords = rdbAssetLoader.Convert(std::move(assetRecords));
EXPECT_TRUE(!assetsRecords.empty());
}
/**
* @tc.name: Convert002
* @tc.desc: Convert test.
* @tc.type: FUNC
* @tc.require:
* @tc.author:
*/
HWTEST_F(RdbAssetLoaderTest, Convert002, TestSize.Level0)
{
BindAssets bindAssets;
auto cloudAssetLoader = std::make_shared<AssetLoader>();
DistributedRdb::RdbAssetLoader rdbAssetLoader(cloudAssetLoader, &bindAssets);
Value prefix;
VBucket assets;
std::vector<AssetsRecord> assetsRecords;
AssetRecord assetsRecord { .gid = "gid", .prefix = prefix, .assets = assets };
assetsRecords.emplace_back(assetsRecord);
auto assetRecords = rdbAssetLoader.Convert(std::move(assetsRecords));
EXPECT_TRUE(!assetRecords.empty());
}
/**
* @tc.name: RemoveLocalAssets
* @tc.desc: RdbAssetLoader RemoveLocalAssets test.