mirror of
https://gitee.com/openharmony/distributeddatamgr_datamgr_service
synced 2024-11-27 17:10:42 +00:00
!2153 超时补充PullAsset流程
Merge pull request !2153 from wanghuajian-6/pullAssets
This commit is contained in:
commit
5235cf124b
@ -407,7 +407,6 @@ void ObjectStoreManager::UnregisterRemoteCallback(const std::string &bundleName,
|
||||
void ObjectStoreManager::NotifyChange(std::map<std::string, std::vector<uint8_t>> &changedData)
|
||||
{
|
||||
ZLOGI("OnChange start, size:%{public}zu", changedData.size());
|
||||
std::map<std::string, std::map<std::string, std::vector<uint8_t>>> data;
|
||||
bool hasAsset = false;
|
||||
SaveInfo saveInfo;
|
||||
for (const auto &[key, value] : changedData) {
|
||||
@ -416,6 +415,24 @@ void ObjectStoreManager::NotifyChange(std::map<std::string, std::vector<uint8_t>
|
||||
break;
|
||||
}
|
||||
}
|
||||
auto data = GetObjectData(changedData, saveInfo, hasAsset);
|
||||
if (!hasAsset) {
|
||||
RADAR_REPORT(ObjectStore::DATA_RESTORE, ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS,
|
||||
ObjectStore::BIZ_STATE, ObjectStore::START);
|
||||
callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
|
||||
DoNotify(tokenId, value, data, true); // no asset, data ready means all ready
|
||||
return false;
|
||||
});
|
||||
return;
|
||||
}
|
||||
NotifyDataChanged(data, saveInfo);
|
||||
SaveUserToMeta();
|
||||
}
|
||||
|
||||
std::map<std::string, std::map<std::string, std::vector<uint8_t>>> ObjectStoreManager::GetObjectData(
|
||||
const std::map<std::string, std::vector<uint8_t>>& changedData, SaveInfo& saveInfo, bool& hasAsset)
|
||||
{
|
||||
std::map<std::string, std::map<std::string, std::vector<uint8_t>>> data;
|
||||
std::string keyPrefix = saveInfo.ToPropertyPrefix();
|
||||
if (!keyPrefix.empty()) {
|
||||
std::string observerKey = saveInfo.bundleName + saveInfo.sessionId;
|
||||
@ -432,9 +449,15 @@ void ObjectStoreManager::NotifyChange(std::map<std::string, std::vector<uint8_t>
|
||||
} else {
|
||||
for (const auto &item : changedData) {
|
||||
std::vector<std::string> splitKeys = SplitEntryKey(item.first);
|
||||
if (splitKeys.empty()) {
|
||||
if (splitKeys.size() <= PROPERTY_NAME_INDEX) {
|
||||
continue;
|
||||
}
|
||||
if (saveInfo.sourceDeviceId.empty() || saveInfo.bundleName.empty()) {
|
||||
saveInfo.sourceDeviceId = splitKeys[SOURCE_DEVICE_UDID_INDEX];
|
||||
saveInfo.bundleName = splitKeys[BUNDLE_NAME_INDEX];
|
||||
saveInfo.sessionId = splitKeys[SESSION_ID_INDEX];
|
||||
saveInfo.timestamp = splitKeys[TIME_INDEX];
|
||||
}
|
||||
std::string prefix = splitKeys[BUNDLE_NAME_INDEX] + splitKeys[SESSION_ID_INDEX];
|
||||
std::string propertyName = splitKeys[PROPERTY_NAME_INDEX];
|
||||
data[prefix].insert_or_assign(propertyName, item.second);
|
||||
@ -443,23 +466,13 @@ void ObjectStoreManager::NotifyChange(std::map<std::string, std::vector<uint8_t>
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!hasAsset) {
|
||||
RADAR_REPORT(ObjectStore::DATA_RESTORE, ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS,
|
||||
ObjectStore::BIZ_STATE, ObjectStore::START);
|
||||
callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
|
||||
DoNotify(tokenId, value, data, true); // no asset, data ready means all ready
|
||||
return false;
|
||||
});
|
||||
return;
|
||||
}
|
||||
NotifyDataChanged(data);
|
||||
SaveUserToMeta();
|
||||
return data;
|
||||
}
|
||||
|
||||
void ObjectStoreManager::ComputeStatus(const std::string& objectKey,
|
||||
void ObjectStoreManager::ComputeStatus(const std::string& objectKey, const SaveInfo& saveInfo,
|
||||
const std::map<std::string, std::map<std::string, std::vector<uint8_t>>>& data)
|
||||
{
|
||||
restoreStatus_.Compute(objectKey, [this, &data] (const auto &key, auto &value) {
|
||||
restoreStatus_.Compute(objectKey, [this, &data, saveInfo] (const auto &key, auto &value) {
|
||||
if (value == RestoreStatus::ASSETS_READY) {
|
||||
value = RestoreStatus::ALL_READY;
|
||||
RADAR_REPORT(ObjectStore::DATA_RESTORE, ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS);
|
||||
@ -475,27 +488,30 @@ void ObjectStoreManager::ComputeStatus(const std::string& objectKey,
|
||||
DoNotify(tokenId, value, data, false);
|
||||
return false;
|
||||
});
|
||||
WaitAssets(key);
|
||||
WaitAssets(key, saveInfo, data);
|
||||
}
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
void ObjectStoreManager::NotifyDataChanged(std::map<std::string, std::map<std::string, std::vector<uint8_t>>>& data)
|
||||
void ObjectStoreManager::NotifyDataChanged(std::map<std::string, std::map<std::string, std::vector<uint8_t>>>& data,
|
||||
const SaveInfo& saveInfo)
|
||||
{
|
||||
for (auto const& [objectKey, results] : data) {
|
||||
restoreStatus_.ComputeIfAbsent(
|
||||
objectKey, [](const std::string& key) -> auto {
|
||||
return RestoreStatus::NONE;
|
||||
});
|
||||
ComputeStatus(objectKey, data);
|
||||
ComputeStatus(objectKey, saveInfo, data);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t ObjectStoreManager::WaitAssets(const std::string& objectKey)
|
||||
int32_t ObjectStoreManager::WaitAssets(const std::string& objectKey, const SaveInfo& saveInfo,
|
||||
const std::map<std::string, std::map<std::string, std::vector<uint8_t>>>& data)
|
||||
{
|
||||
auto taskId = executors_->Schedule(std::chrono::seconds(WAIT_TIME), [this, objectKey]() {
|
||||
ZLOGE("wait assets finisehd timeout, objectKey:%{public}s", objectKey.c_str());
|
||||
auto taskId = executors_->Schedule(std::chrono::seconds(WAIT_TIME), [this, objectKey, data, saveInfo]() {
|
||||
ZLOGE("wait assets finisehd timeout, try pull assets, objectKey:%{public}s", objectKey.c_str());
|
||||
PullAssets(data, saveInfo);
|
||||
DoNotifyWaitAssetTimeout(objectKey);
|
||||
});
|
||||
|
||||
@ -506,6 +522,26 @@ int32_t ObjectStoreManager::WaitAssets(const std::string& objectKey)
|
||||
return OBJECT_SUCCESS;
|
||||
}
|
||||
|
||||
void ObjectStoreManager::PullAssets(const std::map<std::string, std::map<std::string, std::vector<uint8_t>>>& data,
|
||||
const SaveInfo& saveInfo)
|
||||
{
|
||||
std::map<std::string, Assets> changedAssets;
|
||||
for (auto const& [objectId, result] : data) {
|
||||
changedAssets[objectId] = GetAssetsFromDBRecords(result);
|
||||
}
|
||||
for (const auto& [objectId, assets] : changedAssets) {
|
||||
std::string networkId = DmAdaper::GetInstance().ToNetworkID(saveInfo.sourceDeviceId);
|
||||
auto block = std::make_shared<BlockData<std::tuple<bool, bool>>>(WAIT_TIME, std::tuple{ true, true });
|
||||
ObjectAssetLoader::GetInstance()->TransferAssetsAsync(std::stoi(GetCurrentUser()),
|
||||
saveInfo.bundleName, networkId, assets, [this, block](bool success) {
|
||||
block->SetValue({ false, success });
|
||||
});
|
||||
auto [timeout, success] = block->GetValue();
|
||||
ZLOGI("Pull assets end, timeout: %{public}d, success: %{public}d, size:%{public}zu, deviceId: %{public}s",
|
||||
timeout, success, assets.size(), DistributedData::Anonymous::Change(networkId).c_str());
|
||||
}
|
||||
}
|
||||
|
||||
void ObjectStoreManager::NotifyAssetsReady(const std::string& objectKey, const std::string& srcNetworkId)
|
||||
{
|
||||
restoreStatus_.ComputeIfAbsent(
|
||||
|
@ -175,12 +175,19 @@ private:
|
||||
const std::string& assetPrefix);
|
||||
Assets GetAssetsFromDBRecords(const std::map<std::string, std::vector<uint8_t>>& result);
|
||||
bool RegisterAssetsLister();
|
||||
void ComputeStatus(const std::string& objectKey,
|
||||
void ComputeStatus(const std::string& objectKey, const SaveInfo& saveInfo,
|
||||
const std::map<std::string, std::map<std::string, std::vector<uint8_t>>>& data);
|
||||
void NotifyDataChanged(std::map<std::string, std::map<std::string, std::vector<uint8_t>>>& data);
|
||||
void NotifyDataChanged(std::map<std::string, std::map<std::string, std::vector<uint8_t>>>& data,
|
||||
const SaveInfo& saveInfo);
|
||||
int32_t PushAssets(int32_t userId, const std::string &appId, const std::string &sessionId,
|
||||
const std::map<std::string, std::vector<uint8_t>> &data, const std::string &deviceId);
|
||||
int32_t WaitAssets(const std::string& objectKey);
|
||||
int32_t WaitAssets(const std::string& objectKey, const SaveInfo& saveInfo,
|
||||
const std::map<std::string, std::map<std::string, std::vector<uint8_t>>>& data);
|
||||
void PullAssets(const std::map<std::string, std::map<std::string, std::vector<uint8_t>>>& data,
|
||||
const SaveInfo& saveInfo);
|
||||
std::map<std::string, std::map<std::string, std::vector<uint8_t>>> GetObjectData(
|
||||
const std::map<std::string, std::vector<uint8_t>>& changedData, SaveInfo& saveInfo, bool& hasAsset);
|
||||
|
||||
inline std::string GetPropertyPrefix(const std::string &appId, const std::string &sessionId)
|
||||
{
|
||||
return appId + SEPERATOR + sessionId + SEPERATOR + DmAdaper::GetInstance().GetLocalDevice().udid + SEPERATOR;
|
||||
|
@ -228,7 +228,7 @@ HWTEST_F(ObjectManagerTest, NotifyDataChanged001, TestSize.Level0)
|
||||
std::shared_ptr<ExecutorPool> executors = std::make_shared<ExecutorPool>(5, 3); // executor pool
|
||||
manager->SetThreadPool(executors);
|
||||
ASSERT_EQ(manager->restoreStatus_.Find(objectKey).first, false);
|
||||
manager->NotifyDataChanged(data);
|
||||
manager->NotifyDataChanged(data, {});
|
||||
ASSERT_EQ(manager->restoreStatus_.Find(objectKey).second, RestoreStatus::DATA_READY);
|
||||
}
|
||||
|
||||
@ -366,7 +366,7 @@ HWTEST_F(ObjectManagerTest, ComputeStatus001, TestSize.Level0)
|
||||
std::string objectKey="com.example.myapplicaiton123456";
|
||||
std::map<std::string, std::map<std::string, std::vector<uint8_t>>> data{};
|
||||
manager->restoreStatus_.Clear();
|
||||
manager->ComputeStatus(objectKey, data);
|
||||
manager->ComputeStatus(objectKey, {}, data);
|
||||
auto [has0, value0] = manager->restoreStatus_.Find(objectKey);
|
||||
EXPECT_TRUE(has0);
|
||||
EXPECT_EQ(value0, RestoreStatus::DATA_READY);
|
||||
@ -377,7 +377,7 @@ HWTEST_F(ObjectManagerTest, ComputeStatus001, TestSize.Level0)
|
||||
manager->restoreStatus_.Clear();
|
||||
|
||||
manager->restoreStatus_.Insert(objectKey, RestoreStatus::ASSETS_READY);
|
||||
manager->ComputeStatus(objectKey, data);
|
||||
manager->ComputeStatus(objectKey, {}, data);
|
||||
auto [has2, value2] = manager->restoreStatus_.Find(objectKey);
|
||||
EXPECT_TRUE(has2);
|
||||
EXPECT_EQ(value2, RestoreStatus::ALL_READY);
|
||||
|
Loading…
Reference in New Issue
Block a user