!618 【request】支持监听任务的响应头

Merge pull request !618 from 华鑫/master
This commit is contained in:
openharmony_ci 2024-02-23 10:05:04 +00:00 committed by Gitee
commit 8c6e913669
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
81 changed files with 2407 additions and 452 deletions

View File

@ -48,6 +48,7 @@ ohos_shared_library("request") {
"src/app_state_callback.cpp",
"src/async_call.cpp",
"src/js_initialize.cpp",
"src/js_response_listener.cpp",
"src/js_task.cpp",
"src/legacy/download_task.cpp",
"src/legacy/request_manager.cpp",
@ -106,6 +107,7 @@ ohos_static_library("request_static") {
"src/app_state_callback.cpp",
"src/async_call.cpp",
"src/js_initialize.cpp",
"src/js_response_listener.cpp",
"src/js_task.cpp",
"src/legacy/download_task.cpp",
"src/legacy/request_manager.cpp",

View File

@ -263,5 +263,13 @@ struct DownloadInfo {
std::string description;
int64_t downloadedBytes;
};
struct Response {
std::string taskId;
std::string version;
int32_t statusCode;
std::string reason;
std::map<std::string, std::vector<std::string>> headers;
};
} // namespace OHOS::Request
#endif //JS_COMMON_H

View File

@ -0,0 +1,50 @@
/*
* Copyright (c) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef OHOS_REQUEST_JS_RESPONSE_LISTENER_H
#define OHOS_REQUEST_JS_RESPONSE_LISTENER_H
#include <list>
#include <string>
#include "i_response_listener.h"
#include "napi/native_api.h"
#include "napi_utils.h"
namespace OHOS::Request {
class JSResponseListener
: public IResponseListener
, public std::enable_shared_from_this<JSResponseListener> {
public:
JSResponseListener(napi_env env, const std::string &taskId) : env_(env), taskId_(taskId)
{
}
napi_status AddListener(napi_value cb);
napi_status RemoveListener(napi_value cb = nullptr);
void OnResponseReceive(const std::shared_ptr<Response> &response) override;
bool HasListener();
private:
bool IsListenerAdded(napi_value cb);
private:
const napi_env env_;
const std::string taskId_;
std::list<napi_ref> allCb_;
};
} // namespace OHOS::Request
#endif // OHOS_REQUEST_JS_RESPONSE_LISTENER_H

View File

@ -18,6 +18,7 @@
#include "async_call.h"
#include "js_common.h"
#include "js_response_listener.h"
#include "request_notify.h"
namespace OHOS::Request {
@ -61,6 +62,7 @@ public:
static std::map<std::string, int32_t> pathMap_;
std::mutex listenerMutex_;
std::map<std::string, std::vector<sptr<RequestNotify>>> listenerMap_;
std::shared_ptr<JSResponseListener> responseListener_;
private:
struct ContextInfo : public AsyncCall::Context {

View File

@ -62,7 +62,9 @@ napi_value Convert2JSValue(napi_env env, const std::vector<TaskState> &taskState
napi_value Convert2JSValue(napi_env env, const Progress &progress);
napi_value Convert2JSValue(napi_env env, TaskInfo &taskInfo);
napi_value Convert2JSValue(napi_env env, Config &config);
napi_value Convert2JSValue(napi_env env, const std::shared_ptr<Response> &response);
napi_value Convert2JSValue(napi_env env, const std::vector<FileSpec> &files, const std::vector<FormItem> &forms);
napi_value Convert2JSHeaders(napi_env env, const std::map<std::string, std::vector<std::string>> &header);
napi_value Convert2JSHeadersAndBody(napi_env env, const std::map<std::string, std::string> &header,
const std::vector<uint8_t> &bodyBytes, bool isSeparate);

View File

@ -0,0 +1,117 @@
/*
* Copyright (C) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "js_response_listener.h"
#include "request_manager.h"
namespace OHOS::Request {
napi_status JSResponseListener::AddListener(napi_value cb)
{
if (this->IsListenerAdded(cb)) {
return napi_ok;
}
napi_ref ref;
napi_status status = napi_create_reference(env_, cb, 1, &ref);
if (status != napi_ok) {
return status;
}
this->allCb_.push_back(ref);
if (this->allCb_.size() == 1) {
RequestManager::GetInstance()->Subscribe(this->taskId_, shared_from_this());
}
return napi_ok;
}
napi_status JSResponseListener::RemoveListener(napi_value cb)
{
if (this->allCb_.empty()) {
return napi_ok;
}
if (cb == nullptr) {
RequestManager::GetInstance()->Unsubscribe(this->taskId_, shared_from_this());
while (!this->allCb_.empty()) {
napi_ref ref = this->allCb_.front();
napi_delete_reference(this->env_, ref);
this->allCb_.pop_front();
}
return napi_ok;
}
for (auto it = this->allCb_.begin(); it != this->allCb_.end(); it++) {
napi_value copyValue = nullptr;
napi_get_reference_value(this->env_, *it, &copyValue);
bool isEquals = false;
napi_strict_equals(this->env_, cb, copyValue, &isEquals);
if (isEquals) {
napi_delete_reference(this->env_, *it);
this->allCb_.erase(it);
break;
}
}
if (this->allCb_.empty()) {
RequestManager::GetInstance()->Unsubscribe(this->taskId_, shared_from_this());
}
return napi_ok;
}
void JSResponseListener::OnResponseReceive(const std::shared_ptr<Response> &response)
{
napi_value value = NapiUtils::Convert2JSValue(this->env_, response);
for (auto it = this->allCb_.begin(); it != this->allCb_.end(); it++) {
napi_handle_scope scope = nullptr;
napi_open_handle_scope(this->env_, &scope);
napi_value callbackFunc = nullptr;
napi_get_reference_value(this->env_, *it, &callbackFunc);
napi_value callbackResult = nullptr;
uint32_t paramNumber = 1;
napi_call_function(this->env_, nullptr, callbackFunc, paramNumber, &value, &callbackResult);
napi_close_handle_scope(this->env_, scope);
}
}
bool JSResponseListener::IsListenerAdded(napi_value cb)
{
if (cb == nullptr) {
return true;
}
for (auto it = this->allCb_.begin(); it != this->allCb_.end(); it++) {
napi_value copyValue = nullptr;
napi_get_reference_value(this->env_, *it, &copyValue);
bool isEquals = false;
napi_strict_equals(this->env_, cb, copyValue, &isEquals);
if (isEquals) {
return true;
}
}
return false;
}
bool JSResponseListener::HasListener()
{
return !this->allCb_.empty();
}
} // namespace OHOS::Request

View File

@ -150,6 +150,8 @@ napi_value JsTask::JsMain(napi_env env, napi_callback_info info, Version version
}
napi_status status = napi_get_reference_value(context->env_, context->taskRef, result);
context->task->SetTid(context->tid);
context->task->responseListener_ =
std::make_shared<JSResponseListener>(context->env_, std::to_string(context->tid));
JsTask::AddTaskMap(std::to_string(context->tid), context->task);
JsTask::AddTaskContextMap(std::to_string(context->tid), context);
napi_value config = nullptr;
@ -366,6 +368,7 @@ bool JsTask::GetTaskOutput(std::shared_ptr<ContextInfo> context)
}
napi_unwrap(context->env_, jsTask, reinterpret_cast<void **>(&context->task));
napi_create_reference(context->env_, jsTask, 1, &(context->taskRef));
context->task->responseListener_ = std::make_shared<JSResponseListener>(context->env_, tid);
JsTask::AddTaskMap(tid, context->task);
JsTask::AddTaskContextMap(tid, context);
}
@ -834,8 +837,13 @@ void JsTask::ReloadListener()
{
REQUEST_HILOGD("ReloadListener in");
std::lock_guard<std::mutex> lockGuard(JsTask::taskMutex_);
RequestManager::GetInstance()->ReopenChannel();
for (const auto &it : taskMap_) {
std::string tid = it.first;
if (it.second->responseListener_->HasListener()) {
RequestManager::GetInstance()->Unsubscribe(tid, it.second->responseListener_);
RequestManager::GetInstance()->Subscribe(tid, it.second->responseListener_);
}
for (auto itListener : it.second->listenerMap_) {
std::string key = itListener.first;
if (key.find(tid) == std::string::npos) {

View File

@ -427,6 +427,63 @@ napi_value Convert2JSValue(napi_env env, Config &config)
return value;
}
napi_value Convert2JSValue(napi_env env, const std::shared_ptr<Response> &response)
{
napi_value value = nullptr;
napi_create_object(env, &value);
napi_set_named_property(env, value, "version", Convert2JSValue(env, response->version));
napi_set_named_property(env, value, "statusCode", Convert2JSValue(env, response->statusCode));
napi_set_named_property(env, value, "reason", Convert2JSValue(env, response->reason));
napi_set_named_property(env, value, "headers", Convert2JSHeaders(env, response->headers));
return value;
}
napi_value Convert2JSHeaders(napi_env env, const std::map<std::string, std::vector<std::string>> &headers)
{
napi_value value = nullptr;
napi_value value2 = nullptr;
napi_value global = nullptr;
napi_value mapConstructor = nullptr;
napi_value mapSet = nullptr;
const uint32_t paramNumber = 2;
napi_value args[paramNumber] = { 0 };
napi_status status = napi_get_global(env, &global);
if (status != napi_ok) {
REQUEST_HILOGE("response napi_get_global failed");
return nullptr;
}
status = napi_get_named_property(env, global, "Map", &mapConstructor);
if (status != napi_ok) {
REQUEST_HILOGE("response map failed");
return nullptr;
}
status = napi_new_instance(env, mapConstructor, 0, nullptr, &value);
if (status != napi_ok) {
REQUEST_HILOGE("response napi_new_instance failed");
return nullptr;
}
status = napi_get_named_property(env, value, "set", &mapSet);
if (status != napi_ok) {
REQUEST_HILOGE("response set failed");
return nullptr;
}
for (const auto &it : headers) {
args[0] = Convert2JSValue(env, it.first);
args[1] = Convert2JSValue(env, it.second);
status = napi_call_function(env, value, mapSet, paramNumber, args, &value2);
if (status != napi_ok) {
REQUEST_HILOGE("response napi_call_function failed, %{public}d", status);
return nullptr;
}
}
return value;
}
std::string GetSaveas(const std::vector<FileSpec> &files, Action action)
{
if (action == Action::UPLOAD) {

View File

@ -17,9 +17,9 @@
#include <thread>
#include "download_server_ipc_interface_code.h"
#include "log.h"
#include "parcel_helper.h"
#include "download_server_ipc_interface_code.h"
#include "request_event.h"
namespace OHOS::Request {

View File

@ -29,6 +29,7 @@ static constexpr const char *EVENT_PROGRESS = "progress";
static constexpr const char *EVENT_HEADERRECEIVE = "headerReceive";
static constexpr const char *EVENT_FAIL = "fail";
static constexpr const char *EVENT_COMPLETE = "complete";
static constexpr const char *EVENT_RESPONSE = "response";
std::unordered_set<std::string> RequestEvent::supportEventsV9_ = {
EVENT_COMPLETE,
@ -46,6 +47,7 @@ std::unordered_set<std::string> RequestEvent::supportEventsV10_ = {
EVENT_PAUSE,
EVENT_RESUME,
EVENT_REMOVE,
EVENT_RESPONSE,
};
std::map<std::string, RequestEvent::Event> RequestEvent::requestEvent_ = {
@ -140,6 +142,16 @@ napi_value RequestEvent::On(napi_env env, napi_callback_info info)
return nullptr;
}
/* on response */
if (jsParam.type.compare(EVENT_RESPONSE) == 0) {
napi_status ret = jsParam.task->responseListener_->AddListener(jsParam.callback);
if (ret != napi_ok) {
REQUEST_HILOGE("AddListener fail");
}
REQUEST_HILOGD("On event %{public}s + %{public}s", jsParam.type.c_str(), jsParam.task->GetTid().c_str());
return nullptr;
}
sptr<RequestNotify> listener = new (std::nothrow) RequestNotify(env, jsParam.callback);
if (listener == nullptr) {
REQUEST_HILOGE("Create callback object fail");
@ -165,6 +177,15 @@ napi_value RequestEvent::Off(napi_env env, napi_callback_info info)
return nullptr;
}
/* off response */
if (jsParam.type.compare(EVENT_RESPONSE) == 0) {
napi_status ret = jsParam.task->responseListener_->RemoveListener(jsParam.callback);
if (ret != napi_ok) {
REQUEST_HILOGE("RemoveListener fail");
}
return nullptr;
}
if (jsParam.callback == nullptr) {
jsParam.task->RemoveListener(jsParam.type, jsParam.task->GetTid(), jsParam.task->config_.version);
} else {

View File

@ -48,8 +48,10 @@ ohos_shared_library("request_native") {
sources = [
"src/parcel_helper.cpp",
"src/request_manager.cpp",
"src/request_manager_impl.cpp",
"src/request_service_proxy.cpp",
"src/request_sync_load_callback.cpp",
"src/response_message_receiver.cpp",
]
deps = [ "../../common:request_common_static" ]
@ -59,6 +61,7 @@ ohos_shared_library("request_native") {
"ability_runtime:data_ability_helper",
"ability_runtime:napi_base_context",
"c_utils:utils",
"eventhandler:libeventhandler",
"hilog:libhilog",
"ipc:ipc_single",
"relational_store:native_dataability",

View File

@ -35,6 +35,9 @@ enum class RequestInterfaceCode {
CMD_SEARCH,
CMD_GETTASK,
CMD_CLEAR,
CMD_OPENCHANNEL,
CMD_SUBSCRIBE,
CMD_UNSUBSCRIBE,
};
enum class RequestNotifyInterfaceCode {

View File

@ -0,0 +1,31 @@
/*
* Copyright (C) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef OHOS_REQUEST_I_RESPONSE_LISTENER_H
#define OHOS_REQUEST_I_RESPONSE_LISTENER_H
#include "js_common.h"
namespace OHOS::Request {
class IResponseListener {
public:
virtual ~IResponseListener() = default;
virtual void OnResponseReceive(const std::shared_ptr<Response> &response) = 0;
};
} // namespace OHOS::Request
#endif // OHOS_REQUEST_I_RESPONSE_LISTENER_H

View File

@ -0,0 +1,30 @@
/*
* Copyright (C) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef OHOS_REQUEST_I_RESPONSE_MESSAGE_HANDLER_H
#define OHOS_REQUEST_I_RESPONSE_MESSAGE_HANDLER_H
#include "i_response_listener.h"
namespace OHOS::Request {
class IResponseMessageHandler : public IResponseListener {
public:
virtual void OnChannelBroken() = 0;
};
} // namespace OHOS::Request
#endif // OHOS_REQUEST_I_RESPONSE_MESSAGE_HANDLER_H

View File

@ -0,0 +1,86 @@
/*
* Copyright (C) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef OHOS_REQUEST_REQUEST_H
#define OHOS_REQUEST_REQUEST_H
#include <set>
#include "i_response_listener.h"
namespace OHOS::Request {
class Request {
public:
static constexpr uint32_t EVENT_NONE = 0;
static constexpr uint32_t EVENT_RESPONSE = (1 << 0);
public:
Request(const std::string &taskId) : taskId_(taskId), events_(0U)
{
}
const std::string &getId() const
{
return this->taskId_;
}
size_t AddListener(const std::shared_ptr<IResponseListener> &listener)
{
this->responseListeners_.emplace(listener);
return this->responseListeners_.size();
}
size_t RemoveListener(const std::shared_ptr<IResponseListener> &listener)
{
this->responseListeners_.erase(listener);
return this->responseListeners_.size();
}
bool IsEventSubscribed(uint32_t eventType)
{
return ((events_ & eventType) == eventType);
}
void MarkEventSubscribed(uint32_t eventType, bool subscribed)
{
if (subscribed) {
events_ |= eventType;
} else {
events_ &= (~eventType);
}
}
bool HasListener() const
{
return !(this->responseListeners_.empty());
}
void OnResponseReceive(const std::shared_ptr<Response> &response)
{
for (auto responseListener : responseListeners_) {
responseListener->OnResponseReceive(response);
}
}
private:
const std::string taskId_;
uint32_t events_;
std::set<std::shared_ptr<IResponseListener>> responseListeners_;
};
} // namespace OHOS::Request
#endif // OHOS_REQUEST_REQUEST_H

View File

@ -13,38 +13,19 @@
* limitations under the License.
*/
#ifndef DOWNLOAD_MANAGER_H
#define DOWNLOAD_MANAGER_H
#ifndef OHOS_REQUEST_DOWNLOAD_MANAGER_H
#define OHOS_REQUEST_DOWNLOAD_MANAGER_H
#include <atomic>
#include <condition_variable>
#include <map>
#include <mutex>
#include "constant.h"
#include "data_ability_helper.h"
#include "iremote_object.h"
#include "iservice_registry.h"
#include "i_response_listener.h"
#include "js_common.h"
#include "notify_stub.h"
#include "refbase.h"
#include "request_service_interface.h"
#include "system_ability_status_change_stub.h"
#include "visibility.h"
namespace OHOS::Request {
class RequestSaDeathRecipient : public IRemoteObject::DeathRecipient {
public:
explicit RequestSaDeathRecipient();
~RequestSaDeathRecipient() = default;
void OnRemoteDied(const wptr<IRemoteObject> &object) override;
};
class RequestManager : public RefBase {
class RequestManager {
public:
RequestManager();
~RequestManager();
REQUEST_API static sptr<RequestManager> GetInstance();
REQUEST_API static const std::unique_ptr<RequestManager> &GetInstance();
REQUEST_API int32_t Create(const Config &config, int32_t &tid, sptr<NotifyInterface> listener);
REQUEST_API int32_t GetTask(const std::string &tid, const std::string &token, Config &config);
REQUEST_API int32_t Start(const std::string &tid);
@ -62,43 +43,20 @@ public:
const std::string &type, const std::string &tid, const sptr<NotifyInterface> &listener, Version version);
REQUEST_API int32_t Off(const std::string &type, const std::string &tid, Version version);
REQUEST_API int32_t Subscribe(const std::string &taskId, const std::shared_ptr<IResponseListener> &listener);
REQUEST_API int32_t Unsubscribe(const std::string &taskId, const std::shared_ptr<IResponseListener> &listener);
REQUEST_API void RestoreListener(void (*callback)());
void OnRemoteSaDied(const wptr<IRemoteObject> &object);
REQUEST_API bool LoadRequestServer();
REQUEST_API bool IsSaReady();
void LoadServerSuccess();
void LoadServerFail();
REQUEST_API void ReopenChannel();
private:
sptr<RequestServiceInterface> GetRequestServiceProxy();
int32_t Retry(int32_t &taskId, const Config &config, int32_t errorCode, sptr<NotifyInterface> listener);
void SetRequestServiceProxy(sptr<RequestServiceInterface> proxy);
bool SubscribeSA(sptr<ISystemAbilityManager> systemAbilityManager);
private:
static std::mutex instanceLock_;
static sptr<RequestManager> instance_;
std::mutex downloadMutex_;
std::mutex conditionMutex_;
std::mutex serviceProxyMutex_;
sptr<RequestServiceInterface> requestServiceProxy_;
sptr<RequestSaDeathRecipient> deathRecipient_;
sptr<ISystemAbilityStatusChange> saChangeListener_;
std::condition_variable syncCon_;
std::atomic<bool> ready_ = false;
static constexpr int LOAD_SA_TIMEOUT_MS = 15000;
void (*callback_)() = nullptr;
private:
class SystemAbilityStatusChangeListener : public OHOS::SystemAbilityStatusChangeStub {
public:
SystemAbilityStatusChangeListener();
~SystemAbilityStatusChangeListener() = default;
virtual void OnAddSystemAbility(int32_t saId, const std::string &deviceId) override;
virtual void OnRemoveSystemAbility(int32_t asId, const std::string &deviceId) override;
};
RequestManager() = default;
RequestManager(const RequestManager &) = delete;
RequestManager(RequestManager &&) = delete;
RequestManager &operator=(const RequestManager &) = delete;
};
} // namespace OHOS::Request
#endif // DOWNLOAD_MANAGER_H
#endif // OHOS_REQUEST_DOWNLOAD_MANAGER_H

View File

@ -0,0 +1,118 @@
/*
* Copyright (C) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef OHOS_REQUEST_DOWNLOAD_MANAGER_IMPL_H
#define OHOS_REQUEST_DOWNLOAD_MANAGER_IMPL_H
#include <atomic>
#include <condition_variable>
#include <map>
#include <mutex>
#include "constant.h"
#include "data_ability_helper.h"
#include "i_response_message_handler.h"
#include "iremote_object.h"
#include "iservice_registry.h"
#include "js_common.h"
#include "notify_stub.h"
#include "refbase.h"
#include "request.h"
#include "request_service_interface.h"
#include "response_message_receiver.h"
#include "system_ability_status_change_stub.h"
#include "visibility.h"
namespace OHOS::Request {
class RequestSaDeathRecipient : public IRemoteObject::DeathRecipient {
public:
explicit RequestSaDeathRecipient();
~RequestSaDeathRecipient() = default;
void OnRemoteDied(const wptr<IRemoteObject> &object) override;
};
class RequestManagerImpl : public IResponseMessageHandler {
public:
static const std::unique_ptr<RequestManagerImpl> &GetInstance();
int32_t Create(const Config &config, int32_t &tid, sptr<NotifyInterface> listener);
int32_t GetTask(const std::string &tid, const std::string &token, Config &config);
int32_t Start(const std::string &tid);
int32_t Stop(const std::string &tid);
int32_t Query(const std::string &tid, TaskInfo &info);
int32_t Touch(const std::string &tid, const std::string &token, TaskInfo &info);
int32_t Search(const Filter &filter, std::vector<std::string> &tids);
int32_t Show(const std::string &tid, TaskInfo &info);
int32_t Pause(const std::string &tid, Version version);
int32_t QueryMimeType(const std::string &tid, std::string &mimeType);
int32_t Remove(const std::string &tid, Version version);
int32_t Resume(const std::string &tid);
int32_t On(const std::string &type, const std::string &tid, const sptr<NotifyInterface> &listener, Version version);
int32_t Off(const std::string &type, const std::string &tid, Version version);
int32_t Subscribe(const std::string &taskId, const std::shared_ptr<IResponseListener> &listener);
int32_t Unsubscribe(const std::string &taskId, const std::shared_ptr<IResponseListener> &listener);
void RestoreListener(void (*callback)());
bool LoadRequestServer();
bool IsSaReady();
void OnRemoteSaDied(const wptr<IRemoteObject> &object);
void LoadServerSuccess();
void LoadServerFail();
void ReopenChannel();
private:
RequestManagerImpl() = default;
RequestManagerImpl(const RequestManagerImpl &) = delete;
RequestManagerImpl(RequestManagerImpl &&) = delete;
RequestManagerImpl &operator=(const RequestManagerImpl &) = delete;
sptr<RequestServiceInterface> GetRequestServiceProxy();
int32_t Retry(int32_t &taskId, const Config &config, int32_t errorCode, sptr<NotifyInterface> listener);
void SetRequestServiceProxy(sptr<RequestServiceInterface> proxy);
bool SubscribeSA(sptr<ISystemAbilityManager> systemAbilityManager);
int32_t EnsureChannelOpen();
std::shared_ptr<Request> GetTask(const std::string &taskId);
void OnChannelBroken() override;
void OnResponseReceive(const std::shared_ptr<Response> &response) override;
private:
static std::mutex instanceLock_;
static sptr<RequestManagerImpl> instance_;
std::mutex downloadMutex_;
std::mutex conditionMutex_;
std::mutex serviceProxyMutex_;
sptr<RequestServiceInterface> requestServiceProxy_;
sptr<RequestSaDeathRecipient> deathRecipient_;
sptr<ISystemAbilityStatusChange> saChangeListener_;
std::condition_variable syncCon_;
std::atomic<bool> ready_ = false;
static constexpr int LOAD_SA_TIMEOUT_MS = 15000;
void (*callback_)() = nullptr;
std::map<std::string, std::shared_ptr<Request>> tasks_;
std::shared_ptr<ResponseMessageReceiver> msgReceiver_;
private:
class SystemAbilityStatusChangeListener : public OHOS::SystemAbilityStatusChangeStub {
public:
SystemAbilityStatusChangeListener();
~SystemAbilityStatusChangeListener() = default;
virtual void OnAddSystemAbility(int32_t saId, const std::string &deviceId) override;
virtual void OnRemoveSystemAbility(int32_t asId, const std::string &deviceId) override;
};
};
} // namespace OHOS::Request
#endif // OHOS_REQUEST_DOWNLOAD_MANAGER_IMPL_H

View File

@ -44,6 +44,10 @@ public:
virtual int32_t On(
const std::string &type, const std::string &tid, const sptr<NotifyInterface> &listener, Version version) = 0;
virtual int32_t Off(const std::string &type, const std::string &tid, Version version) = 0;
virtual int32_t OpenChannel(int32_t &sockFd) = 0;
virtual int32_t Subscribe(const std::string &taskId, int32_t cbType) = 0;
virtual int32_t Unsubscribe(const std::string &taskId, int32_t cbType) = 0;
};
} // namespace OHOS::Request
#endif // DOWNLOAD_SERVICE_INTERFACE_H

View File

@ -45,6 +45,10 @@ public:
Version version) override;
int32_t Off(const std::string &type, const std::string &tid, Version version) override;
int32_t OpenChannel(int32_t &sockFd) override;
int32_t Subscribe(const std::string &taskId, int32_t cbType) override;
int32_t Unsubscribe(const std::string &taskId, int32_t cbType) override;
private:
static void GetVectorData(const Config &config, MessageParcel &data);
static inline BrokerDelegator<RequestServiceProxy> delegator_;

View File

@ -0,0 +1,53 @@
/*
* Copyright (C) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef OHOS_REQUEST_RESPONSE_MESSAGE_RECEIVER_H
#define OHOS_REQUEST_RESPONSE_MESSAGE_RECEIVER_H
#include "event_handler.h"
#include "event_runner.h"
#include "i_response_message_handler.h"
namespace OHOS::Request {
enum MessageType {
HTTP_RESPONSE = 0,
};
class ResponseMessageReceiver
: public OHOS::AppExecFwk::FileDescriptorListener
, public std::enable_shared_from_this<ResponseMessageReceiver> {
public:
static constexpr uint32_t RESPONSE_MAX_SIZE = 8 * 1024;
static constexpr uint32_t RESPONSE_MAGIC_NUM = 0x43434646;
ResponseMessageReceiver(IResponseMessageHandler *handler, int32_t sockFd);
void BeginReceive();
void Shutdown(void);
private:
void OnReadable(int32_t fd) override;
void OnShutdown(int32_t fd) override;
void OnException(int32_t fd) override;
private:
IResponseMessageHandler *handler_;
int32_t sockFd_{ -1 };
int32_t messageId_{ 1 };
};
} // namespace OHOS::Request
#endif // OHOS_REQUEST_RESPONSE_MESSAGE_RECEIVER_H

View File

@ -15,402 +15,112 @@
#include "request_manager.h"
#include <atomic>
#include "data_ability_predicates.h"
#include "log.h"
#include "rdb_errno.h"
#include "rdb_helper.h"
#include "rdb_open_callback.h"
#include "rdb_predicates.h"
#include "rdb_store.h"
#include "request_sync_load_callback.h"
#include "result_set.h"
#include "system_ability_definition.h"
#include "request_manager_impl.h"
namespace OHOS::Request {
std::mutex RequestManager::instanceLock_;
sptr<RequestManager> RequestManager::instance_ = nullptr;
constexpr const int32_t RETRY_INTERVAL = 500 * 1000;
constexpr const int32_t RETRY_MAX_TIMES = 5;
RequestManager::RequestManager() : requestServiceProxy_(nullptr), deathRecipient_(nullptr), saChangeListener_(nullptr)
const std::unique_ptr<RequestManager> &RequestManager::GetInstance()
{
}
RequestManager::~RequestManager()
{
}
sptr<RequestManager> RequestManager::GetInstance()
{
if (instance_ == nullptr) {
std::lock_guard<std::mutex> autoLock(instanceLock_);
if (instance_ == nullptr) {
instance_ = new RequestManager;
}
}
return instance_;
static std::unique_ptr<RequestManager> instance(new RequestManager());
return instance;
}
int32_t RequestManager::Create(const Config &config, int32_t &tid, sptr<NotifyInterface> listener)
{
REQUEST_HILOGD("RequestManager Create start.");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
REQUEST_HILOGE("GetRequestServiceProxy fail.");
return E_SERVICE_ERROR;
}
int32_t ret = proxy->Create(config, tid, listener);
if (ret == E_UNLOADING_SA) {
REQUEST_HILOGE("Service ability is quitting");
return Retry(tid, config, ret, listener);
}
REQUEST_HILOGD("RequestManager Create end.");
return ret;
return RequestManagerImpl::GetInstance()->Create(config, tid, listener);
}
int32_t RequestManager::Retry(int32_t &taskId, const Config &config, int32_t errorCode, sptr<NotifyInterface> listener)
{
REQUEST_HILOGD("Retry in");
int32_t interval = 1;
while (errorCode == E_UNLOADING_SA && interval <= RETRY_MAX_TIMES) {
if (config.action == Action::DOWNLOAD) {
for (auto file : config.files) {
std::remove(file.uri.c_str());
}
}
if (errorCode == E_UNLOADING_SA) {
// Waitting for system ability quit
usleep(RETRY_INTERVAL);
}
SetRequestServiceProxy(nullptr);
LoadRequestServer();
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
REQUEST_HILOGE("proxy is nullptr!");
continue;
}
errorCode = proxy->Create(config, taskId, listener);
++interval;
}
if (errorCode != E_OK && config.action == Action::DOWNLOAD) {
for (auto file : config.files) {
std::remove(file.uri.c_str());
}
}
return errorCode;
}
void RequestManager::SetRequestServiceProxy(sptr<RequestServiceInterface> proxy)
{
std::lock_guard<std::mutex> lock(serviceProxyMutex_);
requestServiceProxy_ = proxy;
}
int32_t RequestManager::GetTask(const std::string &tid, const std::string &token, Config &config)
{
REQUEST_HILOGD("GetTask in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->GetTask(tid, token, config);
return RequestManagerImpl::GetInstance()->GetTask(tid, token, config);
}
int32_t RequestManager::Start(const std::string &tid)
{
REQUEST_HILOGD("Start in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Start(tid);
return RequestManagerImpl::GetInstance()->Start(tid);
}
int32_t RequestManager::Stop(const std::string &tid)
{
REQUEST_HILOGD("Stop in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Stop(tid);
return RequestManagerImpl::GetInstance()->Stop(tid);
}
int32_t RequestManager::Query(const std::string &tid, TaskInfo &info)
{
REQUEST_HILOGD("Query in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Query(tid, info);
return RequestManagerImpl::GetInstance()->Query(tid, info);
}
int32_t RequestManager::Touch(const std::string &tid, const std::string &token, TaskInfo &info)
{
REQUEST_HILOGD("Touch in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Touch(tid, token, info);
return RequestManagerImpl::GetInstance()->Touch(tid, token, info);
}
int32_t RequestManager::Search(const Filter &filter, std::vector<std::string> &tids)
{
REQUEST_HILOGD("Search in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Search(filter, tids);
return RequestManagerImpl::GetInstance()->Search(filter, tids);
}
int32_t RequestManager::Show(const std::string &tid, TaskInfo &info)
{
REQUEST_HILOGD("Show in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Show(tid, info);
return RequestManagerImpl::GetInstance()->Show(tid, info);
}
int32_t RequestManager::Pause(const std::string &tid, Version version)
{
REQUEST_HILOGD("Pause in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Pause(tid, version);
return RequestManagerImpl::GetInstance()->Pause(tid, version);
}
int32_t RequestManager::QueryMimeType(const std::string &tid, std::string &mimeType)
{
REQUEST_HILOGD("QueryMimeType in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->QueryMimeType(tid, mimeType);
return RequestManagerImpl::GetInstance()->QueryMimeType(tid, mimeType);
}
int32_t RequestManager::Remove(const std::string &tid, Version version)
{
REQUEST_HILOGD("Remove in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Remove(tid, version);
return RequestManagerImpl::GetInstance()->Remove(tid, version);
}
int32_t RequestManager::Resume(const std::string &tid)
{
REQUEST_HILOGD("Resume in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Resume(tid);
return RequestManagerImpl::GetInstance()->Resume(tid);
}
int32_t RequestManager::On(
const std::string &type, const std::string &tid, const sptr<NotifyInterface> &listener, Version version)
{
REQUEST_HILOGD("On in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return false;
}
return proxy->On(type, tid, listener, version);
return RequestManagerImpl::GetInstance()->On(type, tid, listener, version);
}
int32_t RequestManager::Off(const std::string &type, const std::string &tid, Version version)
{
REQUEST_HILOGD("Off in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return false;
}
return proxy->Off(type, tid, version);
return RequestManagerImpl::GetInstance()->Off(type, tid, version);
}
sptr<RequestServiceInterface> RequestManager::GetRequestServiceProxy()
int32_t RequestManager::Subscribe(const std::string &taskId, const std::shared_ptr<IResponseListener> &listener)
{
std::lock_guard<std::mutex> lock(serviceProxyMutex_);
if (requestServiceProxy_ != nullptr) {
return requestServiceProxy_;
}
sptr<ISystemAbilityManager> systemAbilityManager =
SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
if (systemAbilityManager == nullptr) {
REQUEST_HILOGE("Getting SystemAbilityManager failed.");
return nullptr;
}
auto systemAbility = systemAbilityManager->GetSystemAbility(DOWNLOAD_SERVICE_ID, "");
if (systemAbility == nullptr) {
REQUEST_HILOGE("Get SystemAbility failed.");
return nullptr;
}
if (!SubscribeSA(systemAbilityManager)) {
REQUEST_HILOGE("Subscribe SystemAbility failed.");
return nullptr;
}
deathRecipient_ = new RequestSaDeathRecipient();
systemAbility->AddDeathRecipient(deathRecipient_);
requestServiceProxy_ = iface_cast<RequestServiceInterface>(systemAbility);
if (requestServiceProxy_ == nullptr) {
REQUEST_HILOGE("Get requestServiceProxy_ fail.");
return nullptr;
}
return requestServiceProxy_;
return RequestManagerImpl::GetInstance()->Subscribe(taskId, listener);
}
// Subscribe SA status changes only once
bool RequestManager::SubscribeSA(sptr<ISystemAbilityManager> systemAbilityManager)
int32_t RequestManager::Unsubscribe(const std::string &taskId, const std::shared_ptr<IResponseListener> &listener)
{
if (saChangeListener_ != nullptr) {
return true;
}
saChangeListener_ = new (std::nothrow) SystemAbilityStatusChangeListener();
if (saChangeListener_ == nullptr) {
REQUEST_HILOGE("Get saChangeListener_ failed.");
return false;
}
if (systemAbilityManager->SubscribeSystemAbility(DOWNLOAD_SERVICE_ID, saChangeListener_) != E_OK) {
REQUEST_HILOGE("SubscribeSystemAbility failed.");
return false;
}
return true;
return RequestManagerImpl::GetInstance()->Unsubscribe(taskId, listener);
}
void RequestManager::RestoreListener(void (*callback)())
{
callback_ = callback;
}
RequestManager::SystemAbilityStatusChangeListener::SystemAbilityStatusChangeListener()
{
}
void RequestManager::SystemAbilityStatusChangeListener::OnAddSystemAbility(int32_t saId, const std::string &deviceId)
{
if (saId != DOWNLOAD_SERVICE_ID) {
REQUEST_HILOGE("SA ID is not DOWNLOAD_SERVICE_ID.");
}
REQUEST_HILOGD("SystemAbility Add.");
if (RequestManager::GetInstance()->callback_ != nullptr) {
RequestManager::GetInstance()->callback_();
}
}
void RequestManager::SystemAbilityStatusChangeListener::OnRemoveSystemAbility(int32_t saId, const std::string &deviceId)
{
if (saId != DOWNLOAD_SERVICE_ID) {
REQUEST_HILOGE("SA ID is not DOWNLOAD_SERVICE_ID.");
}
REQUEST_HILOGD("SystemAbility Remove.");
}
void RequestManager::OnRemoteSaDied(const wptr<IRemoteObject> &remote)
{
REQUEST_HILOGD(" RequestManager::OnRemoteSaDied");
ready_.store(false);
SetRequestServiceProxy(nullptr);
}
RequestSaDeathRecipient::RequestSaDeathRecipient()
{
}
void RequestSaDeathRecipient::OnRemoteDied(const wptr<IRemoteObject> &object)
{
REQUEST_HILOGI("RequestSaDeathRecipient on remote systemAbility died.");
RequestManager::GetInstance()->OnRemoteSaDied(object);
return RequestManagerImpl::GetInstance()->RestoreListener(callback);
}
bool RequestManager::LoadRequestServer()
{
REQUEST_HILOGD("Begin load request server");
if (ready_.load()) {
REQUEST_HILOGD("GetSystemAbilityManager ready_ true");
return true;
}
std::lock_guard<std::mutex> lock(downloadMutex_);
if (ready_.load()) {
REQUEST_HILOGD("GetSystemAbilityManager ready_ is true");
return true;
}
auto sm = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
if (sm == nullptr) {
REQUEST_HILOGE("GetSystemAbilityManager return null");
return false;
}
auto systemAbility = sm->CheckSystemAbility(DOWNLOAD_SERVICE_ID);
if (systemAbility != nullptr) {
REQUEST_HILOGI("service already exists");
return true;
}
sptr<RequestSyncLoadCallback> loadCallback_ = new (std::nothrow) RequestSyncLoadCallback();
if (loadCallback_ == nullptr) {
REQUEST_HILOGE("new DownloadAbilityCallback fail");
return false;
}
int32_t result = sm->LoadSystemAbility(DOWNLOAD_SERVICE_ID, loadCallback_);
if (result != ERR_OK) {
REQUEST_HILOGE("LoadSystemAbility %{public}d failed, result: %{public}d", DOWNLOAD_SERVICE_ID, result);
return false;
}
{
std::unique_lock<std::mutex> conditionLock(conditionMutex_);
auto waitStatus = syncCon_.wait_for(
conditionLock, std::chrono::milliseconds(LOAD_SA_TIMEOUT_MS), [this]() { return ready_.load(); });
if (!waitStatus) {
REQUEST_HILOGE("download server load sa timeout");
return false;
}
}
return true;
return RequestManagerImpl::GetInstance()->LoadRequestServer();
}
bool RequestManager::IsSaReady()
{
return ready_.load();
return RequestManagerImpl::GetInstance()->IsSaReady();
}
void RequestManager::LoadServerSuccess()
void RequestManager::ReopenChannel()
{
std::unique_lock<std::mutex> lock(conditionMutex_);
ready_.store(true);
syncCon_.notify_one();
REQUEST_HILOGI("load download server success");
return RequestManagerImpl::GetInstance()->ReopenChannel();
}
void RequestManager::LoadServerFail()
{
ready_.store(false);
REQUEST_HILOGE("load download server fail");
}
} // namespace OHOS::Request
} // namespace OHOS::Request

View File

@ -0,0 +1,512 @@
/*
* Copyright (C) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "request_manager_impl.h"
#include <atomic>
#include <memory>
#include "data_ability_predicates.h"
#include "errors.h"
#include "log.h"
#include "rdb_errno.h"
#include "rdb_helper.h"
#include "rdb_open_callback.h"
#include "rdb_predicates.h"
#include "rdb_store.h"
#include "request_manager.h"
#include "request_sync_load_callback.h"
#include "response_message_receiver.h"
#include "result_set.h"
#include "system_ability_definition.h"
namespace OHOS::Request {
constexpr const int32_t RETRY_INTERVAL = 500 * 1000;
constexpr const int32_t RETRY_MAX_TIMES = 5;
const std::unique_ptr<RequestManagerImpl> &RequestManagerImpl::GetInstance()
{
static std::unique_ptr<RequestManagerImpl> instance(new RequestManagerImpl());
return instance;
}
int32_t RequestManagerImpl::Create(const Config &config, int32_t &tid, sptr<NotifyInterface> listener)
{
REQUEST_HILOGD("RequestManagerImpl Create start.");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
REQUEST_HILOGE("GetRequestServiceProxy fail.");
return E_SERVICE_ERROR;
}
int32_t ret = proxy->Create(config, tid, listener);
if (ret == E_UNLOADING_SA) {
REQUEST_HILOGE("Service ability is quitting");
return Retry(tid, config, ret, listener);
}
REQUEST_HILOGD("RequestManagerImpl Create end.");
return ret;
}
int32_t RequestManagerImpl::Retry(
int32_t &taskId, const Config &config, int32_t errorCode, sptr<NotifyInterface> listener)
{
REQUEST_HILOGD("Retry in");
int32_t interval = 1;
while (errorCode == E_UNLOADING_SA && interval <= RETRY_MAX_TIMES) {
if (config.action == Action::DOWNLOAD) {
for (auto file : config.files) {
std::remove(file.uri.c_str());
}
}
if (errorCode == E_UNLOADING_SA) {
// Waitting for system ability quit
usleep(RETRY_INTERVAL);
}
SetRequestServiceProxy(nullptr);
LoadRequestServer();
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
REQUEST_HILOGE("proxy is nullptr!");
continue;
}
errorCode = proxy->Create(config, taskId, listener);
++interval;
}
if (errorCode != E_OK && config.action == Action::DOWNLOAD) {
for (auto file : config.files) {
std::remove(file.uri.c_str());
}
}
return errorCode;
}
void RequestManagerImpl::SetRequestServiceProxy(sptr<RequestServiceInterface> proxy)
{
std::lock_guard<std::mutex> lock(serviceProxyMutex_);
requestServiceProxy_ = proxy;
}
int32_t RequestManagerImpl::GetTask(const std::string &tid, const std::string &token, Config &config)
{
REQUEST_HILOGD("GetTask in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->GetTask(tid, token, config);
}
int32_t RequestManagerImpl::Start(const std::string &tid)
{
REQUEST_HILOGD("Start in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Start(tid);
}
int32_t RequestManagerImpl::Stop(const std::string &tid)
{
REQUEST_HILOGD("Stop in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Stop(tid);
}
int32_t RequestManagerImpl::Query(const std::string &tid, TaskInfo &info)
{
REQUEST_HILOGD("Query in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Query(tid, info);
}
int32_t RequestManagerImpl::Touch(const std::string &tid, const std::string &token, TaskInfo &info)
{
REQUEST_HILOGD("Touch in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Touch(tid, token, info);
}
int32_t RequestManagerImpl::Search(const Filter &filter, std::vector<std::string> &tids)
{
REQUEST_HILOGD("Search in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Search(filter, tids);
}
int32_t RequestManagerImpl::Show(const std::string &tid, TaskInfo &info)
{
REQUEST_HILOGD("Show in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Show(tid, info);
}
int32_t RequestManagerImpl::Pause(const std::string &tid, Version version)
{
REQUEST_HILOGD("Pause in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Pause(tid, version);
}
int32_t RequestManagerImpl::QueryMimeType(const std::string &tid, std::string &mimeType)
{
REQUEST_HILOGD("QueryMimeType in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->QueryMimeType(tid, mimeType);
}
int32_t RequestManagerImpl::Remove(const std::string &tid, Version version)
{
REQUEST_HILOGD("Remove in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Remove(tid, version);
}
int32_t RequestManagerImpl::Resume(const std::string &tid)
{
REQUEST_HILOGD("Resume in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return E_SERVICE_ERROR;
}
return proxy->Resume(tid);
}
int32_t RequestManagerImpl::On(
const std::string &type, const std::string &tid, const sptr<NotifyInterface> &listener, Version version)
{
REQUEST_HILOGD("On in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return false;
}
return proxy->On(type, tid, listener, version);
}
int32_t RequestManagerImpl::Off(const std::string &type, const std::string &tid, Version version)
{
REQUEST_HILOGD("Off in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return false;
}
return proxy->Off(type, tid, version);
}
int32_t RequestManagerImpl::Subscribe(const std::string &taskId, const std::shared_ptr<IResponseListener> &listener)
{
REQUEST_HILOGD("Subscribe in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return false;
}
std::shared_ptr<Request> task = this->GetTask(taskId);
task->AddListener(listener);
if (task->IsEventSubscribed(Request::EVENT_RESPONSE)) {
return E_OK;
}
this->EnsureChannelOpen();
proxy->Subscribe(taskId, Request::EVENT_RESPONSE);
task->MarkEventSubscribed(Request::EVENT_RESPONSE, true);
return E_OK;
}
int32_t RequestManagerImpl::Unsubscribe(const std::string &taskId, const std::shared_ptr<IResponseListener> &listener)
{
REQUEST_HILOGD("Unsubscribe in");
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return false;
}
std::shared_ptr<Request> task = this->GetTask(taskId);
size_t size = task->RemoveListener(listener);
if (size != 0U) {
return E_OK;
}
if (!task->IsEventSubscribed(Request::EVENT_RESPONSE)) {
return E_OK;
}
proxy->Unsubscribe(taskId, Request::EVENT_RESPONSE);
task->MarkEventSubscribed(Request::EVENT_RESPONSE, false);
return E_OK;
}
int32_t RequestManagerImpl::EnsureChannelOpen()
{
if (msgReceiver_) {
return E_OK;
}
auto proxy = GetRequestServiceProxy();
if (proxy == nullptr) {
return false;
}
int32_t sockFd = -1;
int32_t ret = proxy->OpenChannel(sockFd);
if (ret != E_OK) {
return ret;
}
msgReceiver_ = std::make_shared<ResponseMessageReceiver>(this, sockFd);
msgReceiver_->BeginReceive();
return E_OK;
}
std::shared_ptr<Request> RequestManagerImpl::GetTask(const std::string &taskId)
{
auto it = tasks_.find(taskId);
if (it != tasks_.end()) {
return it->second;
}
auto retPair = this->tasks_.emplace(taskId, std::make_shared<Request>(Request(taskId)));
if (retPair.second) {
return retPair.first->second;
}
REQUEST_HILOGE("Response Task create fail");
return std::shared_ptr<Request>();
}
void RequestManagerImpl::OnChannelBroken()
{
this->msgReceiver_.reset();
}
void RequestManagerImpl::OnResponseReceive(const std::shared_ptr<Response> &response)
{
auto it = tasks_.find(response->taskId);
if (it == tasks_.end()) {
REQUEST_HILOGE("task not found");
return;
}
it->second->OnResponseReceive(response);
}
sptr<RequestServiceInterface> RequestManagerImpl::GetRequestServiceProxy()
{
std::lock_guard<std::mutex> lock(serviceProxyMutex_);
if (requestServiceProxy_ != nullptr) {
return requestServiceProxy_;
}
sptr<ISystemAbilityManager> systemAbilityManager =
SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
if (systemAbilityManager == nullptr) {
REQUEST_HILOGE("Getting SystemAbilityManager failed.");
return nullptr;
}
auto systemAbility = systemAbilityManager->GetSystemAbility(DOWNLOAD_SERVICE_ID, "");
if (systemAbility == nullptr) {
REQUEST_HILOGE("Get SystemAbility failed.");
return nullptr;
}
if (!SubscribeSA(systemAbilityManager)) {
REQUEST_HILOGE("Subscribe SystemAbility failed.");
return nullptr;
}
deathRecipient_ = new RequestSaDeathRecipient();
systemAbility->AddDeathRecipient(deathRecipient_);
requestServiceProxy_ = iface_cast<RequestServiceInterface>(systemAbility);
if (requestServiceProxy_ == nullptr) {
REQUEST_HILOGE("Get requestServiceProxy_ fail.");
return nullptr;
}
return requestServiceProxy_;
}
// Subscribe SA status changes only once
bool RequestManagerImpl::SubscribeSA(sptr<ISystemAbilityManager> systemAbilityManager)
{
if (saChangeListener_ != nullptr) {
return true;
}
saChangeListener_ = new (std::nothrow) SystemAbilityStatusChangeListener();
if (saChangeListener_ == nullptr) {
REQUEST_HILOGE("Get saChangeListener_ failed.");
return false;
}
if (systemAbilityManager->SubscribeSystemAbility(DOWNLOAD_SERVICE_ID, saChangeListener_) != E_OK) {
REQUEST_HILOGE("SubscribeSystemAbility failed.");
return false;
}
return true;
}
void RequestManagerImpl::RestoreListener(void (*callback)())
{
callback_ = callback;
}
RequestManagerImpl::SystemAbilityStatusChangeListener::SystemAbilityStatusChangeListener()
{
}
void RequestManagerImpl::SystemAbilityStatusChangeListener::OnAddSystemAbility(
int32_t saId, const std::string &deviceId)
{
if (saId != DOWNLOAD_SERVICE_ID) {
REQUEST_HILOGE("SA ID is not DOWNLOAD_SERVICE_ID.");
}
REQUEST_HILOGD("SystemAbility Add.");
if (RequestManagerImpl::GetInstance()->callback_ != nullptr) {
RequestManagerImpl::GetInstance()->callback_();
}
}
void RequestManagerImpl::SystemAbilityStatusChangeListener::OnRemoveSystemAbility(
int32_t saId, const std::string &deviceId)
{
if (saId != DOWNLOAD_SERVICE_ID) {
REQUEST_HILOGE("SA ID is not DOWNLOAD_SERVICE_ID.");
}
REQUEST_HILOGD("SystemAbility Remove.");
}
void RequestManagerImpl::OnRemoteSaDied(const wptr<IRemoteObject> &remote)
{
REQUEST_HILOGD(" RequestManagerImpl::OnRemoteSaDied");
ready_.store(false);
SetRequestServiceProxy(nullptr);
}
RequestSaDeathRecipient::RequestSaDeathRecipient()
{
}
void RequestSaDeathRecipient::OnRemoteDied(const wptr<IRemoteObject> &object)
{
REQUEST_HILOGI("RequestSaDeathRecipient on remote systemAbility died.");
RequestManagerImpl::GetInstance()->OnRemoteSaDied(object);
}
bool RequestManagerImpl::LoadRequestServer()
{
REQUEST_HILOGD("Begin load request server");
if (ready_.load()) {
REQUEST_HILOGD("GetSystemAbilityManager ready_ true");
return true;
}
std::lock_guard<std::mutex> lock(downloadMutex_);
if (ready_.load()) {
REQUEST_HILOGD("GetSystemAbilityManager ready_ is true");
return true;
}
auto sm = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
if (sm == nullptr) {
REQUEST_HILOGE("GetSystemAbilityManager return null");
return false;
}
auto systemAbility = sm->CheckSystemAbility(DOWNLOAD_SERVICE_ID);
if (systemAbility != nullptr) {
REQUEST_HILOGI("service already exists");
return true;
}
sptr<RequestSyncLoadCallback> loadCallback_ = new (std::nothrow) RequestSyncLoadCallback();
if (loadCallback_ == nullptr) {
REQUEST_HILOGE("new DownloadAbilityCallback fail");
return false;
}
int32_t result = sm->LoadSystemAbility(DOWNLOAD_SERVICE_ID, loadCallback_);
if (result != E_OK) {
REQUEST_HILOGE("LoadSystemAbility %{public}d failed, result: %{public}d", DOWNLOAD_SERVICE_ID, result);
return false;
}
{
std::unique_lock<std::mutex> conditionLock(conditionMutex_);
auto waitStatus = syncCon_.wait_for(
conditionLock, std::chrono::milliseconds(LOAD_SA_TIMEOUT_MS), [this]() { return ready_.load(); });
if (!waitStatus) {
REQUEST_HILOGE("download server load sa timeout");
return false;
}
}
return true;
}
bool RequestManagerImpl::IsSaReady()
{
return ready_.load();
}
void RequestManagerImpl::LoadServerSuccess()
{
std::unique_lock<std::mutex> lock(conditionMutex_);
ready_.store(true);
syncCon_.notify_one();
REQUEST_HILOGI("load download server success");
}
void RequestManagerImpl::LoadServerFail()
{
ready_.store(false);
REQUEST_HILOGE("load download server fail");
}
void RequestManagerImpl::ReopenChannel()
{
if (!msgReceiver_) {
return;
}
msgReceiver_->Shutdown();
this->EnsureChannelOpen();
}
} // namespace OHOS::Request

View File

@ -21,10 +21,10 @@
#include <ctime>
#include "download_server_ipc_interface_code.h"
#include "iremote_broker.h"
#include "log.h"
#include "parcel_helper.h"
#include "download_server_ipc_interface_code.h"
namespace OHOS::Request {
using namespace OHOS::HiviewDFX;
@ -370,4 +370,60 @@ int32_t RequestServiceProxy::Off(const std::string &type, const std::string &tid
}
return E_OK;
}
int32_t RequestServiceProxy::OpenChannel(int32_t &sockFd)
{
REQUEST_HILOGD("OpenChannel");
MessageParcel data, reply;
MessageOption option;
data.WriteInterfaceToken(GetDescriptor());
int32_t ret =
Remote()->SendRequest(static_cast<uint32_t>(RequestInterfaceCode::CMD_OPENCHANNEL), data, reply, option);
if (ret != ERR_NONE) {
REQUEST_HILOGE("send request ret code is %{public}d", ret);
return E_SERVICE_ERROR;
}
int32_t errCode = reply.ReadInt32();
if (errCode != E_OK) {
return errCode;
}
sockFd = reply.ReadFileDescriptor();
REQUEST_HILOGD("OpenChannel sockFd: %{public}d", sockFd);
return E_OK;
}
int32_t RequestServiceProxy::Subscribe(const std::string &taskId, int32_t cbType)
{
REQUEST_HILOGD("Subscribe");
MessageParcel data, reply;
MessageOption option;
data.WriteInterfaceToken(GetDescriptor());
data.WriteString(taskId);
data.WriteInt32(cbType);
int32_t ret =
Remote()->SendRequest(static_cast<uint32_t>(RequestInterfaceCode::CMD_SUBSCRIBE), data, reply, option);
if (ret != ERR_NONE) {
REQUEST_HILOGE("send request ret code is %{public}d", ret);
return E_SERVICE_ERROR;
}
return E_OK;
}
int32_t RequestServiceProxy::Unsubscribe(const std::string &taskId, int32_t cbType)
{
REQUEST_HILOGD("Unsubscribe");
MessageParcel data, reply;
MessageOption option;
data.WriteInterfaceToken(GetDescriptor());
data.WriteString(taskId);
data.WriteInt32(cbType);
int32_t ret =
Remote()->SendRequest(static_cast<uint32_t>(RequestInterfaceCode::CMD_UNSUBSCRIBE), data, reply, option);
if (ret != ERR_NONE) {
REQUEST_HILOGE("send request ret code is %{public}d", ret);
return E_SERVICE_ERROR;
}
return E_OK;
}
} // namespace OHOS::Request

View File

@ -19,6 +19,7 @@
#include "isystem_ability_load_callback.h"
#include "log.h"
#include "request_manager.h"
#include "request_manager_impl.h"
#include "system_ability_definition.h"
namespace OHOS::Request {
@ -29,7 +30,7 @@ void RequestSyncLoadCallback::OnLoadSystemAbilitySuccess(
REQUEST_HILOGE("start systemAbilityId is not download server");
return;
}
RequestManager::GetInstance()->LoadServerSuccess();
RequestManagerImpl::GetInstance()->LoadServerSuccess();
}
void RequestSyncLoadCallback::OnLoadSystemAbilityFail(int32_t systemAbilityId)
@ -38,6 +39,6 @@ void RequestSyncLoadCallback::OnLoadSystemAbilityFail(int32_t systemAbilityId)
REQUEST_HILOGE("start systemAbilityId is not download server");
return;
}
RequestManager::GetInstance()->LoadServerFail();
RequestManagerImpl::GetInstance()->LoadServerFail();
}
} // namespace OHOS::Request

View File

@ -0,0 +1,220 @@
/*
* Copyright (c) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "response_message_receiver.h"
#include <stdlib.h>
#include <unistd.h>
#include <sstream>
#include <string>
#include <vector>
#include "log.h"
namespace OHOS::Request {
static constexpr int32_t INT32_SIZE = 4;
static constexpr int32_t INT16_SIZE = 2;
std::shared_ptr<OHOS::AppExecFwk::EventHandler> serviceHandler_;
// retval == 0 means success, < 0 means failed
static int32_t Int32FromParcel(int32_t &num, char *&parcel, int32_t &size)
{
if (size < INT32_SIZE) {
REQUEST_HILOGE("message not complete");
return -1;
}
num = *reinterpret_cast<int32_t *>(parcel);
parcel += INT32_SIZE;
size -= INT32_SIZE;
return 0;
}
static int16_t Int16FromParcel(int16_t &num, char *&parcel, int32_t &size)
{
if (size < INT16_SIZE) {
REQUEST_HILOGE("message not complete");
return -1;
}
num = *reinterpret_cast<int16_t *>(parcel);
parcel += INT16_SIZE;
size -= INT16_SIZE;
return 0;
}
static int32_t StringFromParcel(std::string &str, char *&parcel, int32_t &size)
{
int32_t i = 0;
while (i < size && parcel[i] != '\0') {
++i;
}
if (i < size) {
str.assign(parcel, i);
parcel += (i + 1);
size -= (i + 1);
return 0;
} else {
REQUEST_HILOGE("message not complete");
return -1;
}
}
static int32_t ResponseHeaderFromParcel(
std::map<std::string, std::vector<std::string>> &headers, char *&parcel, int32_t &size)
{
std::string s(parcel, size);
std::stringstream ss(s);
std::string line;
while (std::getline(ss, line, '\n')) {
std::stringstream keyValue(line);
std::string key, valueLine;
std::getline(keyValue, key, ':');
std::getline(keyValue, valueLine);
std::stringstream values(valueLine);
std::string value;
while (getline(values, value, ',')) {
headers[key].push_back(value);
}
}
return 0;
}
ResponseMessageReceiver::ResponseMessageReceiver(IResponseMessageHandler *handler, int32_t sockFd)
: handler_(handler), sockFd_(sockFd)
{
}
void ResponseMessageReceiver::BeginReceive()
{
std::shared_ptr<OHOS::AppExecFwk::EventRunner> runner = OHOS::AppExecFwk::EventRunner::GetMainEventRunner();
serviceHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
serviceHandler_->AddFileDescriptorListener(
sockFd_, OHOS::AppExecFwk::FILE_DESCRIPTOR_INPUT_EVENT, shared_from_this(), "subscribe");
}
// ret 0 if success, ret < 0 if fail
static int32_t MsgHeaderParcel(int32_t &msgId, int16_t &msgType, int16_t &bodySize, char *&parcel, int32_t &size)
{
int32_t magicNum = 0;
if (Int32FromParcel(magicNum, parcel, size) != 0) {
return -1;
}
if (magicNum != ResponseMessageReceiver::RESPONSE_MAGIC_NUM) {
REQUEST_HILOGE("Bad magic num, %{public}d", magicNum);
return -1;
}
if (Int32FromParcel(msgId, parcel, size) != 0) {
return -1;
}
if (Int16FromParcel(msgType, parcel, size) != 0) {
return -1;
}
if (Int16FromParcel(bodySize, parcel, size) != 0) {
return -1;
}
return 0;
}
static int32_t MsgFromParcel(std::shared_ptr<Response> &response, char *&parcel, int32_t &size)
{
int32_t tid;
if (Int32FromParcel(tid, parcel, size) != 0) {
REQUEST_HILOGE("Bad tid");
return -1;
}
response->taskId = std::to_string(tid);
if (StringFromParcel(response->version, parcel, size) != 0) {
REQUEST_HILOGE("Bad version");
return -1;
}
if (Int32FromParcel(response->statusCode, parcel, size) != 0) {
REQUEST_HILOGE("Bad statusCode");
return -1;
}
if (StringFromParcel(response->reason, parcel, size) != 0) {
REQUEST_HILOGE("Bad reason");
return -1;
}
if (ResponseHeaderFromParcel(response->headers, parcel, size) != 0) {
REQUEST_HILOGE("Bad headers");
return -1;
}
return 0;
}
void ResponseMessageReceiver::OnReadable(int32_t fd)
{
int32_t msgId;
int16_t msgType;
int16_t headerSize;
std::shared_ptr<Response> response = std::make_shared<Response>();
int readSize = ResponseMessageReceiver::RESPONSE_MAX_SIZE;
char buffer[readSize];
int32_t length = read(fd, buffer, readSize);
if (length <= 0) {
return;
}
REQUEST_HILOGD("read response: %{public}d", length);
char *leftBuf = buffer;
int32_t leftLen = length;
MsgHeaderParcel(msgId, msgType, headerSize, leftBuf, leftLen);
if (msgId != messageId_) {
REQUEST_HILOGE("Bad messageId");
return;
}
if (headerSize != length) {
REQUEST_HILOGE("Bad headerSize, %{public}d, %{public}d", length, headerSize);
}
++messageId_;
if (MsgFromParcel(response, leftBuf, leftLen) == 0) {
this->handler_->OnResponseReceive(response);
}
}
void ResponseMessageReceiver::OnShutdown(int32_t fd)
{
serviceHandler_->RemoveFileDescriptorListener(fd);
close(fd);
this->handler_->OnChannelBroken();
}
void ResponseMessageReceiver::OnException(int32_t fd)
{
serviceHandler_->RemoveFileDescriptorListener(fd);
close(fd);
this->handler_->OnChannelBroken();
}
void ResponseMessageReceiver::Shutdown()
{
serviceHandler_->RemoveFileDescriptorListener(sockFd_);
close(sockFd_);
this->handler_->OnChannelBroken();
}
} // namespace OHOS::Request

View File

@ -28,7 +28,7 @@ namespace OHOS::Request {
class ApplicationStateObserver {
public:
~ApplicationStateObserver();
using RegCallBack = std::function<void(int32_t uid, int32_t state)>;
using RegCallBack = std::function<void(int32_t uid, int32_t state, int32_t pid)>;
static ApplicationStateObserver &GetInstance();
bool RegisterAppStateChanged(RegCallBack &&callback);
@ -46,7 +46,7 @@ public:
void OnProcessDied(const AppExecFwk::ProcessData &processData) override;
public:
void RunCallback(int32_t uid, int32_t state);
void RunCallback(int32_t uid, int32_t state, int32_t pid);
ApplicationStateObserver &appStateObserver_;
};
ApplicationStateObserver();
@ -58,7 +58,7 @@ public:
extern "C" {
#endif
typedef void (*APPStateCallback)(int32_t, int32_t);
typedef void (*APPStateCallback)(int32_t, int32_t, int32_t);
void RegisterAPPStateCallback(APPStateCallback fun);
#ifdef __cplusplus

View File

@ -84,7 +84,7 @@ void ApplicationStateObserver::AppProcessState::OnAbilityStateChanged(
{
REQUEST_HILOGD("OnAbilityStateChanged uid=%{public}d, bundleName=%{public}s,state=%{public}d",
abilityStateData.uid, abilityStateData.bundleName.c_str(), abilityStateData.abilityState);
RunCallback(abilityStateData.uid, abilityStateData.abilityState);
RunCallback(abilityStateData.uid, abilityStateData.abilityState, abilityStateData.pid);
}
void ApplicationStateObserver::AppProcessState::OnExtensionStateChanged(
@ -98,18 +98,18 @@ void ApplicationStateObserver::AppProcessState::OnProcessCreated(const AppExecFw
void ApplicationStateObserver::AppProcessState::OnProcessDied(const AppExecFwk::ProcessData &processData)
{
REQUEST_HILOGD("OnProcessDied uid=%{public}d, bundleName=%{public}s, state=%{public}d", processData.uid,
processData.bundleName.c_str(), static_cast<int32_t>(processData.state));
RunCallback(processData.uid, static_cast<int32_t>(processData.state));
REQUEST_HILOGD("OnProcessDied uid=%{public}d, bundleName=%{public}s, state=%{public}d, pid=%{public}d",
processData.uid, processData.bundleName.c_str(), static_cast<int32_t>(processData.state), processData.pid);
RunCallback(processData.uid, static_cast<int32_t>(processData.state), processData.pid);
}
void ApplicationStateObserver::AppProcessState::RunCallback(int32_t uid, int32_t state)
void ApplicationStateObserver::AppProcessState::RunCallback(int32_t uid, int32_t state, int32_t pid)
{
if (appStateObserver_.callback_ == nullptr) {
REQUEST_HILOGE("appStateObserver callback is nullptr");
return;
}
appStateObserver_.callback_(uid, state);
appStateObserver_.callback_(uid, state, pid);
}
} // namespace OHOS::Request

View File

@ -30,12 +30,13 @@ extern crate log;
mod hilog;
mod error;
mod manager;
mod manage;
mod task;
mod utils;
cfg_oh! {
mod init;
mod notify;
mod trace;
mod sys_event;
mod service;

View File

@ -17,7 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use crate::error::ErrorCode;
use crate::manager::{SystemProxyManager, TaskManager};
use crate::manage::{SystemProxyManager, TaskManager};
use crate::task::config::{TaskConfig, Version};
use crate::task::ffi::{CTaskConfig, CTaskInfo};
use crate::task::info::State;

View File

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::manager::TaskManager;
use crate::manage::TaskManager;
use crate::task::info::{DumpAllEachInfo, DumpAllInfo, DumpOneInfo};
impl TaskManager {

View File

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::manager::TaskManager;
use crate::manage::TaskManager;
use crate::task::config::TaskConfig;
impl TaskManager {

View File

@ -176,6 +176,14 @@ impl EventMessage {
pub(crate) fn network_change() -> Self {
Self::State(StateMessage::NetworkChange)
}
pub(crate) fn subscribe(task_id: u32, token_id: u64) -> (Self, Recv<ErrorCode>) {
let (tx, rx) = channel::<ErrorCode>();
(
Self::Task(TaskMessage::Subscribe(task_id, token_id, tx)),
Recv::new(rx),
)
}
}
pub(crate) enum ServiceMessage {
@ -197,6 +205,7 @@ pub(crate) enum ServiceMessage {
pub(crate) enum TaskMessage {
Finished(u32),
Subscribe(u32, u64, Sender<ErrorCode>),
}
pub(crate) enum StateMessage {
@ -294,6 +303,11 @@ impl Debug for TaskMessage {
.debug_struct("Finished")
.field("task_id", task_id)
.finish(),
Self::Subscribe(task_id, token_id, _) => f
.debug_struct("Subscribe")
.field("task_id", task_id)
.field("token_id", token_id)
.finish(),
}
}
}

View File

@ -12,7 +12,7 @@
// limitations under the License.
use crate::error::ErrorCode;
use crate::manager::TaskManager;
use crate::manage::TaskManager;
use crate::task::reason::Reason;
impl TaskManager {

View File

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::manager::TaskManager;
use crate::manage::TaskManager;
use crate::task::config::Action;
use crate::task::ffi::{CTaskInfo, DeleteCTaskInfo};
use crate::task::info::TaskInfo;

View File

@ -12,7 +12,7 @@
// limitations under the License.
use super::show::Show;
use crate::manager::TaskManager;
use crate::manage::TaskManager;
use crate::task::ffi::DeleteCTaskInfo;
use crate::task::info::TaskInfo;

View File

@ -12,7 +12,7 @@
// limitations under the License.
use crate::error::ErrorCode;
use crate::manager::TaskManager;
use crate::manage::TaskManager;
use crate::task::ffi::ChangeRequestTaskState;
use crate::task::info::State;
use crate::task::reason::Reason;

View File

@ -14,11 +14,11 @@
use std::sync::atomic::Ordering;
use crate::error::ErrorCode;
use crate::manager::TaskManager;
use crate::manage::TaskManager;
use crate::task::info::State;
cfg_oh! {
use crate::manager::Notifier;
use crate::manage::Notifier;
}
impl TaskManager {

View File

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::manager::TaskManager;
use crate::manage::TaskManager;
use crate::utils::c_wrapper::{CFilter, CVectorWrapper, DeleteCVectorWrapper};
use crate::utils::filter::Filter;

View File

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::manager::TaskManager;
use crate::manage::TaskManager;
use crate::task::ffi::{CTaskInfo, DeleteCTaskInfo};
use crate::task::info::TaskInfo;

View File

@ -15,8 +15,9 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::error::ErrorCode;
use crate::manager::events::{EventMessage, TaskMessage};
use crate::manager::TaskManager;
use crate::manage::events::{EventMessage, TaskMessage};
use crate::manage::TaskManager;
use crate::service::ability::RequestAbility;
use crate::task::info::{ApplicationState, State};
use crate::task::reason::Reason;
use crate::task::request_task::run;
@ -88,6 +89,7 @@ impl TaskManager {
ylong_runtime::spawn(async move {
run(task.clone()).await;
RequestAbility::client_manager().notify_task_finished(task_id);
tx.send(EventMessage::Task(TaskMessage::Finished(
task.conf.common_data.task_id,
)))

View File

@ -14,7 +14,7 @@
use std::sync::atomic::Ordering;
use crate::error::ErrorCode;
use crate::manager::TaskManager;
use crate::manage::TaskManager;
use crate::task::info::State;
use crate::task::reason::Reason;

View File

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::manager::TaskManager;
use crate::manage::TaskManager;
use crate::task::ffi::{CTaskInfo, DeleteCTaskInfo};
use crate::task::info::TaskInfo;
use crate::utils::c_wrapper::CStringWrapper;

View File

@ -15,13 +15,13 @@ use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
use super::TaskManager;
use crate::manager::scheduled;
use crate::manage::scheduled;
use crate::task::config::Action;
use crate::task::info::{ApplicationState, Mode, State};
use crate::task::reason::Reason;
cfg_oh! {
use crate::manager::Notifier;
use crate::manage::Notifier;
}
impl TaskManager {

View File

@ -14,8 +14,8 @@
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
use crate::notify::{Event, NotifyEvent};
use crate::service::ability::RequestAbility;
use crate::service::notify::{Event, NotifyEvent};
use crate::task::config::Version;
use crate::task::info::ApplicationState;
use crate::task::notify::NotifyData;

View File

@ -24,6 +24,7 @@ use super::events::{
use super::qos::{Qos, QosChange, QosQueue};
use super::scheduled;
use crate::error::ErrorCode;
use crate::service::ability::PANIC_INFO;
use crate::task::config::Version;
use crate::task::info::{ApplicationState, State};
use crate::task::reason::Reason;
@ -31,10 +32,8 @@ use crate::task::request_task::RequestTask;
use crate::task::tick::Clock;
use crate::utils::c_wrapper::CStringWrapper;
static mut TASKMANAGER_PANIC_INFO: Option<String> = None;
cfg_oh! {
use crate::manager::Notifier;
use crate::manage::Notifier;
}
pub(crate) struct TaskManager {
@ -86,7 +85,7 @@ impl TaskManagerEntry {
pub(crate) fn send_event(&self, event: EventMessage) -> bool {
if self.tx.send(event).is_err() {
unsafe {
if let Some(e) = TASKMANAGER_PANIC_INFO.as_ref() {
if let Some(e) = PANIC_INFO.as_ref() {
error!("Sends TaskManager event failed {}", e);
} else {
info!("TaskManager is unloading")
@ -102,13 +101,6 @@ impl TaskManager {
pub(crate) fn init() -> TaskManagerEntry {
debug!("TaskManager init");
std::panic::set_hook(Box::new(|info| {
error!("{}", info.to_string());
unsafe {
TASKMANAGER_PANIC_INFO = Some(info.to_string());
}
}));
ylong_runtime::builder::RuntimeBuilder::new_multi_thread()
.worker_num(4)
.build_global()
@ -187,6 +179,27 @@ impl TaskManager {
};
self.after_task_processed(&task);
}
TaskMessage::Subscribe(task_id, token_id, tx) => {
if let Some(task) = self.tasks.get(&task_id) {
if task.conf.common_data.token_id == token_id {
let _ = tx.send(ErrorCode::ErrOk);
} else {
let _ = tx.send(ErrorCode::Permission);
}
return;
}
for task in &self.restoring_tasks {
if task.conf.common_data.task_id == task_id {
if task.conf.common_data.token_id == token_id {
let _ = tx.send(ErrorCode::ErrOk);
} else {
let _ = tx.send(ErrorCode::Permission);
}
return;
}
}
let _ = tx.send(ErrorCode::TaskNotFound);
}
}
}

View File

@ -17,7 +17,7 @@ use std::sync::Arc;
use super::task_manager::GetTopBundleName;
use super::TaskManager;
use crate::manager::monitor::IsOnline;
use crate::manage::monitor::IsOnline;
use crate::task::config::{TaskConfig, Version};
use crate::task::ffi::{CTaskConfig, ChangeRequestTaskState};
use crate::task::info::{ApplicationState, State};

View File

@ -18,7 +18,7 @@ use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedS
use ylong_runtime::sync::oneshot::Sender;
use crate::error::ErrorCode;
use crate::service::notify::{Event, NotifyData, NotifyEvent};
use crate::notify::{Event, NotifyData, NotifyEvent};
use crate::service::RequestNotifyInterfaceCode;
use crate::task::info::State;

View File

@ -30,19 +30,23 @@ use std::hint;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicU8, Ordering};
use crate::manager::task_manager::TaskManagerEntry;
use crate::manager::TaskManager;
use crate::manage::task_manager::TaskManagerEntry;
use crate::manage::TaskManager;
use crate::notify::{NotifyEntry, NotifyManager};
use crate::service::client::{ClientManager, ClientManagerEntry};
use crate::service::listener::{AppStateListener, NetworkChangeListener};
use crate::service::notify::{NotifyEntry, NotifyManager};
static mut REQUEST_ABILITY: MaybeUninit<RequestAbility> = MaybeUninit::uninit();
static STATE: AtomicU8 = AtomicU8::new(RequestAbility::NOT_INITED);
pub(crate) static mut PANIC_INFO: Option<String> = None;
pub(crate) struct RequestAbility {
manager: TaskManagerEntry,
notify: NotifyEntry,
app: AppStateListener,
network: NetworkChangeListener,
client_manager: ClientManagerEntry,
}
impl RequestAbility {
@ -63,6 +67,14 @@ impl RequestAbility {
}
pub(crate) fn init() {
std::panic::set_hook(Box::new(|info| unsafe {
let trace = std::backtrace::Backtrace::force_capture();
let mut info = info.to_string();
info.push_str(trace.to_string().as_str());
error!("{}", info);
PANIC_INFO = Some(info);
}));
if STATE
.compare_exchange(
Self::NOT_INITED,
@ -78,6 +90,7 @@ impl RequestAbility {
notify: NotifyManager::init(),
app: AppStateListener::init(),
network: NetworkChangeListener::init(),
client_manager: ClientManager::init(),
});
RequestInitServiceHandler();
};
@ -113,6 +126,10 @@ impl RequestAbility {
pub(crate) fn task_manager() -> TaskManagerEntry {
Self::get_instance().manager.clone()
}
pub(crate) fn client_manager() -> ClientManagerEntry {
Self::get_instance().client_manager.clone()
}
}
#[cfg(feature = "oh")]

View File

@ -0,0 +1,261 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::net::Shutdown;
use std::os::fd::AsRawFd;
use ylong_http_client::Headers;
use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use ylong_runtime::sync::oneshot::Sender;
use super::{Client, ClientEvent};
use crate::error::ErrorCode;
use crate::service::ability::PANIC_INFO;
const REQUEST_MAGIC_NUM: u32 = 0x43434646;
const HEADERS_MAX_SIZE: u16 = 8 * 1024;
const POSITION_OF_LENGTH: u32 = 10;
pub(crate) enum MessageType {
HttpResponse = 0,
}
#[derive(Clone)]
pub(crate) struct ClientManagerEntry {
tx: UnboundedSender<ClientEvent>,
}
impl ClientManagerEntry {
fn new(tx: UnboundedSender<ClientEvent>) -> Self {
Self { tx }
}
pub(crate) fn send_event(&self, event: ClientEvent) -> bool {
if self.tx.send(event).is_err() {
unsafe {
if let Some(e) = PANIC_INFO.as_ref() {
error!("Sends ClientManager event failed {}", e);
} else {
info!("ClientManager is unloading");
}
}
return false;
}
true
}
}
pub(crate) struct ClientManager {
clients: HashMap<u64, Client>,
pid_map: HashMap<u32, u64>,
rx: UnboundedReceiver<ClientEvent>,
}
impl ClientManager {
pub(crate) fn init() -> ClientManagerEntry {
debug!("ClientManager init");
let (tx, rx) = unbounded_channel();
let client_manager = ClientManager {
clients: HashMap::new(),
pid_map: HashMap::new(),
rx,
};
ylong_runtime::spawn(client_manager.run());
ClientManagerEntry::new(tx)
}
async fn run(mut self) {
loop {
let recv = match self.rx.recv().await {
Ok(message) => message,
Err(e) => {
error!("ClientManager recv error {:?}", e);
continue;
}
};
match recv {
ClientEvent::OpenChannel(pid, uid, token_id, tx) => {
self.handle_open_channel(pid, uid, token_id, tx)
}
ClientEvent::Subscribe(tid, pid, uid, token_id, tx) => {
self.handle_subscribe(tid, pid, uid, token_id, tx)
}
ClientEvent::Unsubscribe(tid, tx) => self.handle_unsubscribe(tid, tx),
ClientEvent::TaskFinished(tid) => self.handle_task_finished(tid),
ClientEvent::Terminate(pid, tx) => self.handle_process_terminated(pid, tx),
ClientEvent::SendResponse(tid, version, status_code, reason, headers) => {
self.handle_send_response(tid, version, status_code, reason, headers)
.await
}
}
debug!("ClientManager handle message done");
}
}
fn handle_open_channel(
&mut self,
pid: u64,
uid: u64,
token_id: u64,
tx: Sender<Result<i32, ErrorCode>>,
) {
let client: &Client;
match self.clients.entry(pid) {
std::collections::hash_map::Entry::Occupied(o) => {
client = o.get();
let _ = tx.send(Ok(client.client_sock_fd.as_raw_fd()));
}
std::collections::hash_map::Entry::Vacant(v) => {
match Client::constructor(pid, uid, token_id) {
Some(client) => {
let _ = tx.send(Ok(client.client_sock_fd.as_raw_fd()));
v.insert(client);
}
None => {
let _ = tx.send(Err(ErrorCode::Other));
}
}
}
}
}
fn handle_subscribe(
&mut self,
tid: u32,
pid: u64,
uid: u64,
token_id: u64,
tx: Sender<ErrorCode>,
) {
if let Some(client) = self.clients.get_mut(&pid) {
let ret = client.handle_subscribe(tid, pid, uid, token_id);
if ret == ErrorCode::ErrOk {
self.pid_map.insert(tid, pid);
let _ = tx.send(ErrorCode::ErrOk);
return;
}
} else {
error!("channel not open");
}
let _ = tx.send(ErrorCode::Other);
}
fn handle_unsubscribe(&mut self, tid: u32, tx: Sender<ErrorCode>) {
if let Some(&pid) = self.pid_map.get(&tid) {
self.pid_map.remove(&tid);
if let Some(client) = self.clients.get_mut(&pid) {
client.handle_unsubscribe(tid);
let _ = tx.send(ErrorCode::ErrOk);
return;
} else {
debug!("client not found");
}
} else {
debug!("unsubscribe tid not found");
}
let _ = tx.send(ErrorCode::Other);
}
fn handle_task_finished(&mut self, tid: u32) {
if let Some(&pid) = self.pid_map.get(&tid) {
self.pid_map.remove(&tid);
if let Some(client) = self.clients.get_mut(&pid) {
client.handle_unsubscribe(tid);
} else {
debug!("client not found");
}
} else {
debug!("unsubscribe tid not found");
}
}
fn handle_process_terminated(&mut self, pid: u64, tx: Sender<ErrorCode>) {
if let Some(client) = self.clients.get_mut(&pid) {
let _ = client.client_sock_fd.shutdown(Shutdown::Both);
let _ = client.server_sock_fd.shutdown(Shutdown::Both);
debug!("client terminate, pid: {}", pid);
for (k, _v) in client.subscribed_map.iter() {
self.pid_map.remove(k);
}
self.clients.remove(&pid);
} else {
debug!("terminate pid not found");
}
let _ = tx.send(ErrorCode::ErrOk);
}
async fn handle_send_response(
&mut self,
tid: u32,
version: String,
status_code: u32,
reason: String,
headers: Headers,
) {
if let Some(&pid) = self.pid_map.get(&tid) {
if let Some(client) = self.clients.get_mut(&pid) {
let mut response = Vec::<u8>::new();
response.extend_from_slice(&REQUEST_MAGIC_NUM.to_le_bytes());
response.extend_from_slice(&client.message_id.to_le_bytes());
client.message_id += 1;
let message_type = MessageType::HttpResponse as u16;
response.extend_from_slice(&message_type.to_le_bytes());
let message_body_size: u16 = 0;
response.extend_from_slice(&message_body_size.to_le_bytes());
response.extend_from_slice(&tid.to_le_bytes());
response.extend_from_slice(&version.into_bytes());
response.push(b'\0');
response.extend_from_slice(&status_code.to_le_bytes());
response.extend_from_slice(&reason.into_bytes());
response.push(b'\0');
for (k, v) in headers {
response.extend_from_slice(k.as_bytes());
response.push(b':');
for (i, sub_value) in v.iter().enumerate() {
if i != 0 {
response.push(b',');
}
response.extend_from_slice(sub_value);
}
response.push(b'\n');
}
let mut size = response.len() as u16;
if size > HEADERS_MAX_SIZE {
response.truncate(HEADERS_MAX_SIZE as usize);
size = HEADERS_MAX_SIZE;
}
debug!("send response size, {:?}", size);
let size = size.to_le_bytes();
response[POSITION_OF_LENGTH as usize] = size[0];
response[(POSITION_OF_LENGTH + 1) as usize] = size[1];
client.send_response(tid, response).await;
} else {
debug!("response client not found");
}
} else {
debug!("response pid not found");
}
}
}

View File

@ -0,0 +1,179 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod manager;
use std::collections::HashMap;
pub(crate) use manager::{ClientManager, ClientManagerEntry};
use ylong_http_client::Headers;
use ylong_runtime::net::UnixDatagram;
use ylong_runtime::sync::oneshot::{channel, Sender};
use crate::error::ErrorCode;
use crate::utils::Recv;
pub(crate) enum ClientEvent {
OpenChannel(u64, u64, u64, Sender<Result<i32, ErrorCode>>),
Subscribe(u32, u64, u64, u64, Sender<ErrorCode>),
Unsubscribe(u32, Sender<ErrorCode>),
TaskFinished(u32),
Terminate(u64, Sender<ErrorCode>),
SendResponse(u32, String, u32, String, Headers),
}
impl ClientManagerEntry {
pub(crate) fn open_channel(&self, pid: u64, uid: u64, token_id: u64) -> Result<i32, ErrorCode> {
let (tx, rx) = channel::<Result<i32, ErrorCode>>();
let event = ClientEvent::OpenChannel(pid, uid, token_id, tx);
if !self.send_event(event) {
return Err(ErrorCode::Other);
}
let rx = Recv::new(rx);
match rx.get() {
Some(ret) => ret,
None => {
error!("open_channel failed");
Err(ErrorCode::Other)
}
}
}
pub(crate) fn subscribe(&self, tid: u32, pid: u64, uid: u64, token_id: u64) -> ErrorCode {
let (tx, rx) = channel::<ErrorCode>();
let event = ClientEvent::Subscribe(tid, pid, uid, token_id, tx);
if !self.send_event(event) {
return ErrorCode::Other;
}
let rx = Recv::new(rx);
match rx.get() {
Some(ret) => ret,
None => {
error!("subscribe failed");
ErrorCode::Other
}
}
}
pub(crate) fn unsubscribe(&self, tid: u32) -> ErrorCode {
let (tx, rx) = channel::<ErrorCode>();
let event = ClientEvent::Unsubscribe(tid, tx);
if !self.send_event(event) {
return ErrorCode::Other;
}
let rx = Recv::new(rx);
match rx.get() {
Some(ret) => ret,
None => {
error!("unsubscribe failed");
ErrorCode::Other
}
}
}
pub(crate) fn notify_task_finished(&self, tid: u32) {
let event = ClientEvent::TaskFinished(tid);
self.send_event(event);
}
pub(crate) fn notify_process_terminate(&self, pid: u64) -> ErrorCode {
let (tx, rx) = channel::<ErrorCode>();
let event = ClientEvent::Terminate(pid, tx);
if !self.send_event(event) {
return ErrorCode::Other;
}
let rx = Recv::new(rx);
match rx.get() {
Some(ret) => ret,
None => {
error!("notify_process_terminate failed");
ErrorCode::Other
}
}
}
pub(crate) fn send_response(
&self,
tid: u32,
version: String,
status_code: u32,
reason: String,
headers: Headers,
) {
let event = ClientEvent::SendResponse(tid, version, status_code, reason, headers);
let _ = self.send_event(event);
}
}
// uid and token_id will be used later
#[allow(dead_code)]
pub(crate) struct Client {
pub(crate) pid: u64,
pub(crate) uid: u64,
pub(crate) token_id: u64,
pub(crate) message_id: u32,
pub(crate) subscribed_map: HashMap<u32, bool>,
pub(crate) server_sock_fd: UnixDatagram,
pub(crate) client_sock_fd: UnixDatagram,
}
impl Client {
pub(crate) fn constructor(pid: u64, uid: u64, token_id: u64) -> Option<Self> {
let (server_sock_fd, client_sock_fd) = match UnixDatagram::pair() {
Ok((server_sock_fd, client_sock_fd)) => (server_sock_fd, client_sock_fd),
Err(err) => {
error!("can't create a pair of sockets, {:?}", err);
return None;
}
};
Some(Client {
pid,
uid,
token_id,
message_id: 1,
subscribed_map: HashMap::new(),
server_sock_fd,
client_sock_fd,
})
}
pub(crate) fn handle_subscribe(
&mut self,
tid: u32,
_pid: u64,
_uid: u64,
_token_id: u64,
) -> ErrorCode {
if let Some(val) = self.subscribed_map.get_mut(&tid) {
*val = true;
} else {
self.subscribed_map.insert(tid, true);
}
ErrorCode::ErrOk
}
pub(crate) fn handle_unsubscribe(&mut self, tid: u32) {
if self.subscribed_map.remove(&tid).is_none() {
error!("tid: {} not subscribed", tid);
}
}
pub(crate) async fn send_response(&mut self, tid: u32, response: Vec<u8>) {
if let Some(is_subscribed) = self.subscribed_map.get(&tid) {
if *is_subscribed {
let _ = self.server_sock_fd.send(&response).await;
}
}
}
}

View File

@ -19,7 +19,7 @@ use ipc_rust::{
};
use crate::error::ErrorCode;
use crate::manager::events::EventMessage;
use crate::manage::events::EventMessage;
use crate::service::ability::RequestAbility;
use crate::service::permission::PermissionChecker;
use crate::service::{get_calling_bundle, open_file_readonly, open_file_readwrite};

View File

@ -15,7 +15,7 @@ use std::io::Write;
use ipc_rust::{FileDesc, IpcStatusCode, String16};
use crate::manager::events::EventMessage;
use crate::manage::events::EventMessage;
use crate::service::ability::RequestAbility;
const HELP_MSG: &str = "usage:\n\

View File

@ -14,7 +14,7 @@
use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode};
use crate::error::ErrorCode;
use crate::manager::events::EventMessage;
use crate::manage::events::EventMessage;
use crate::service::ability::RequestAbility;
use crate::service::serialize_task_config;

View File

@ -16,6 +16,7 @@ mod dump;
mod get_task;
mod off;
mod on;
mod open_channel;
mod pause;
mod query;
mod query_mime_type;
@ -25,13 +26,16 @@ mod search;
mod show;
mod start;
mod stop;
mod subscribe;
mod touch;
mod unsubscribe;
pub(crate) use construct::Construct;
pub(crate) use dump::Dump;
pub(crate) use get_task::GetTask;
pub(crate) use off::Off;
pub(crate) use on::On;
pub(crate) use open_channel::OpenChannel;
pub(crate) use pause::Pause;
pub(crate) use query::Query;
pub(crate) use query_mime_type::QueryMimeType;
@ -41,4 +45,6 @@ pub(crate) use search::Search;
pub(crate) use show::Show;
pub(crate) use start::Start;
pub(crate) use stop::Stop;
pub(crate) use subscribe::Subscribe;
pub(crate) use touch::Touch;
pub(crate) use unsubscribe::Unsubscribe;

View File

@ -14,8 +14,8 @@
use ipc_rust::{BorrowedMsgParcel, IpcResult, IpcStatusCode};
use crate::error::ErrorCode;
use crate::notify::{Event, NotifyEvent};
use crate::service::ability::RequestAbility;
use crate::service::notify::{Event, NotifyEvent};
use crate::service::permission::PermissionChecker;
use crate::task::config::Version;

View File

@ -14,8 +14,8 @@
use ipc_rust::{BorrowedMsgParcel, IpcResult, IpcStatusCode, RemoteObj};
use crate::error::ErrorCode;
use crate::notify::{Event, NotifyEvent};
use crate::service::ability::RequestAbility;
use crate::service::notify::{Event, NotifyEvent};
use crate::service::permission::PermissionChecker;
use crate::task::config::Version;

View File

@ -0,0 +1,52 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fs::File;
use std::os::unix::io::FromRawFd;
use ipc_rust::{
get_calling_pid, get_calling_token_id, get_calling_uid, BorrowedMsgParcel, FileDesc, IpcResult,
IpcStatusCode,
};
use crate::error::ErrorCode;
use crate::service::ability::RequestAbility;
pub(crate) struct OpenChannel;
impl OpenChannel {
pub(crate) fn execute(
_data: &BorrowedMsgParcel,
reply: &mut BorrowedMsgParcel,
) -> IpcResult<()> {
info!("open channnel");
let pid = get_calling_pid();
let uid = get_calling_uid();
let token_id = get_calling_token_id();
match RequestAbility::client_manager().open_channel(pid, uid, token_id) {
Ok(fd) => {
let file = unsafe { File::from_raw_fd(fd) };
let file = FileDesc::new(file);
reply.write(&(ErrorCode::ErrOk as i32))?;
reply.write(&file)?;
info!("open channnel ok ");
Ok(())
}
Err(_) => {
error!("open_channel failed");
reply.write(&(ErrorCode::ParameterCheck as i32))?;
Err(IpcStatusCode::Failed)
}
}
}
}

View File

@ -14,7 +14,7 @@
use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode};
use crate::error::ErrorCode;
use crate::manager::events::EventMessage;
use crate::manage::events::EventMessage;
use crate::service::ability::RequestAbility;
use crate::service::permission::PermissionChecker;
use crate::task::config::Version;

View File

@ -14,7 +14,7 @@
use ipc_rust::{BorrowedMsgParcel, IpcResult, IpcStatusCode};
use crate::error::ErrorCode;
use crate::manager::events::EventMessage;
use crate::manage::events::EventMessage;
use crate::service::ability::RequestAbility;
use crate::service::permission::{PermissionChecker, QueryPermission};
use crate::service::{is_system_api, serialize_task_info};

View File

@ -14,7 +14,7 @@
use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode};
use crate::error::ErrorCode;
use crate::manager::events::EventMessage;
use crate::manage::events::EventMessage;
use crate::service::ability::RequestAbility;
use crate::service::permission::PermissionChecker;

View File

@ -14,7 +14,7 @@
use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode};
use crate::error::ErrorCode;
use crate::manager::events::EventMessage;
use crate::manage::events::EventMessage;
use crate::service::ability::RequestAbility;
use crate::service::permission::PermissionChecker;
use crate::task::config::Version;

View File

@ -14,7 +14,7 @@
use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode};
use crate::error::ErrorCode;
use crate::manager::events::EventMessage;
use crate::manage::events::EventMessage;
use crate::service::ability::RequestAbility;
use crate::service::permission::PermissionChecker;

View File

@ -13,7 +13,7 @@
use ipc_rust::{BorrowedMsgParcel, IpcResult, IpcStatusCode};
use crate::manager::events::EventMessage;
use crate::manage::events::EventMessage;
use crate::service::ability::RequestAbility;
use crate::service::{get_calling_bundle, is_system_api};
use crate::utils::filter::{CommonFilter, Filter};

View File

@ -14,7 +14,7 @@
use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode};
use crate::error::ErrorCode;
use crate::manager::events::EventMessage;
use crate::manage::events::EventMessage;
use crate::service::ability::RequestAbility;
use crate::service::permission::PermissionChecker;
use crate::service::serialize_task_info;

View File

@ -14,7 +14,7 @@
use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode};
use crate::error::ErrorCode;
use crate::manager::events::EventMessage;
use crate::manage::events::EventMessage;
use crate::service::ability::RequestAbility;
use crate::service::permission::PermissionChecker;

View File

@ -14,7 +14,7 @@
use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode};
use crate::error::ErrorCode;
use crate::manager::events::EventMessage;
use crate::manage::events::EventMessage;
use crate::service::ability::RequestAbility;
pub(crate) struct Stop;

View File

@ -0,0 +1,78 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use ipc_rust::{
get_calling_pid, get_calling_token_id, get_calling_uid, BorrowedMsgParcel, IpcResult,
IpcStatusCode,
};
use crate::error::ErrorCode;
use crate::manage::events::EventMessage;
use crate::service::ability::RequestAbility;
pub(crate) struct Subscribe;
impl Subscribe {
pub(crate) fn execute(
data: &BorrowedMsgParcel,
reply: &mut BorrowedMsgParcel,
) -> IpcResult<()> {
info!("subscribe");
let tid: String = data.read()?;
debug!("Service subscribe: task_id is {}", tid);
let pid = get_calling_pid();
let uid = get_calling_uid();
let token_id = get_calling_token_id();
match tid.parse::<u32>() {
Ok(tid) => {
let (event, rx) = EventMessage::subscribe(tid, token_id);
if !RequestAbility::task_manager().send_event(event) {
reply.write(&(ErrorCode::Other as i32))?;
error!("send event failed");
return Err(IpcStatusCode::Failed);
}
let ret = match rx.get() {
Some(ret) => ret,
None => {
error!("Service construct: receives ret failed");
reply.write(&(ErrorCode::Other as i32))?;
return Err(IpcStatusCode::Failed);
}
};
if ret != ErrorCode::ErrOk {
error!("subscribe failed: {:?}", ret);
reply.write(&(ret as i32))?;
return Err(IpcStatusCode::Failed);
}
if RequestAbility::client_manager().subscribe(tid, pid, uid, token_id)
== ErrorCode::ErrOk
{
reply.write(&(ErrorCode::ErrOk as i32))?;
info!("subscribe ok");
Ok(())
} else {
error!("subscribe failed");
reply.write(&(ErrorCode::TaskNotFound as i32))?;
Err(IpcStatusCode::Failed)
}
}
_ => {
error!("Service subscribe: task_id not valid");
reply.write(&(ErrorCode::TaskNotFound as i32))?;
Err(IpcStatusCode::Failed)
}
}
}
}

View File

@ -14,7 +14,7 @@
use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode};
use crate::error::ErrorCode;
use crate::manager::events::EventMessage;
use crate::manage::events::EventMessage;
use crate::service::ability::RequestAbility;
use crate::service::serialize_task_info;

View File

@ -0,0 +1,47 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use ipc_rust::{BorrowedMsgParcel, IpcResult, IpcStatusCode};
use crate::error::ErrorCode;
use crate::service::ability::RequestAbility;
pub(crate) struct Unsubscribe;
impl Unsubscribe {
pub(crate) fn execute(
data: &BorrowedMsgParcel,
reply: &mut BorrowedMsgParcel,
) -> IpcResult<()> {
info!("subscribe");
let tid: String = data.read()?;
debug!("Service unsubscribe: task_id is {}", tid);
match tid.parse::<u32>() {
Ok(tid) => {
if RequestAbility::client_manager().unsubscribe(tid) == ErrorCode::ErrOk {
reply.write(&(ErrorCode::ErrOk as i32))?;
Ok(())
} else {
error!("unsubscribe failed");
reply.write(&(ErrorCode::TaskNotFound as i32))?; // 错误码待统一处理
Err(IpcStatusCode::Failed)
}
}
_ => {
error!("Service unsubscribe: task_id not valid");
reply.write(&(ErrorCode::TaskNotFound as i32))?;
Err(IpcStatusCode::Failed)
}
}
}
}

View File

@ -44,6 +44,12 @@ pub(crate) enum RequestInterfaceCode {
GetTask,
/// system api deletes specifed tasks
Clear,
/// open the channel for ipc
OpenChannel,
/// subscribe response
Subscribe,
/// unsubscribe response
Unsubscribe,
}
/// Function code of RequestNotifyInterfaceCode

View File

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::manager::events::EventMessage;
use crate::manage::events::EventMessage;
use crate::service::ability::RequestAbility;
use crate::task::info::ApplicationState;
@ -33,13 +33,15 @@ impl AppStateListener {
}
}
extern "C" fn app_state_change_callback(uid: i32, state: i32) {
extern "C" fn app_state_change_callback(uid: i32, state: i32, pid: i32) {
info!("Receives app state change callback");
let state = match state {
2 => ApplicationState::Foreground,
4 => ApplicationState::Background,
5 => ApplicationState::Terminated,
5 => {
RequestAbility::client_manager().notify_process_terminate(pid as u64);
ApplicationState::Terminated
}
_ => return,
};
@ -49,5 +51,5 @@ extern "C" fn app_state_change_callback(uid: i32, state: i32) {
#[cfg(feature = "oh")]
#[link(name = "request_service_c")]
extern "C" {
fn RegisterAPPStateCallback(f: extern "C" fn(i32, i32));
fn RegisterAPPStateCallback(f: extern "C" fn(i32, i32, i32));
}

View File

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::manager::events::EventMessage;
use crate::manage::events::EventMessage;
use crate::service::ability::RequestAbility;
pub(crate) struct NetworkChangeListener;

View File

@ -14,11 +14,11 @@
//! This crate implement the request server service.
pub(crate) mod ability;
pub(crate) mod client;
pub(crate) mod command;
#[allow(unused)]
pub(crate) mod interface;
pub(crate) mod listener;
pub(crate) mod notify;
pub(crate) mod permission;
use std::fs::{File, OpenOptions};
@ -72,6 +72,9 @@ fn on_remote_request(
RequestInterfaceCode::Search => stub.search(data, reply),
RequestInterfaceCode::GetTask => stub.get_task(data, reply),
RequestInterfaceCode::Clear => Ok(()),
RequestInterfaceCode::OpenChannel => stub.open_channel(data, reply),
RequestInterfaceCode::Subscribe => stub.subscribe(data, reply),
RequestInterfaceCode::Unsubscribe => stub.unsubscribe(data, reply),
}
}
@ -95,6 +98,9 @@ impl TryFrom<u32> for RequestInterfaceCode {
_ if code == Self::Search as u32 => Ok(Self::Search),
_ if code == Self::GetTask as u32 => Ok(Self::GetTask),
_ if code == Self::Clear as u32 => Ok(Self::Clear),
_ if code == Self::OpenChannel as u32 => Ok(Self::OpenChannel),
_ if code == Self::Subscribe as u32 => Ok(Self::Subscribe),
_ if code == Self::Unsubscribe as u32 => Ok(Self::Unsubscribe),
_ => Err(IpcStatusCode::Failed),
}
}
@ -179,6 +185,33 @@ pub trait RequestServiceInterface: IRemoteBroker {
fn get_task(&self, _data: &BorrowedMsgParcel, _reply: &mut BorrowedMsgParcel) -> IpcResult<()> {
Ok(())
}
/// open the channel for ipc
fn open_channel(
&self,
_data: &BorrowedMsgParcel,
_reply: &mut BorrowedMsgParcel,
) -> IpcResult<()> {
Ok(())
}
/// subscribe response
fn subscribe(
&self,
_data: &BorrowedMsgParcel,
_reply: &mut BorrowedMsgParcel,
) -> IpcResult<()> {
Ok(())
}
/// unsubscribe response
fn unsubscribe(
&self,
_data: &BorrowedMsgParcel,
_reply: &mut BorrowedMsgParcel,
) -> IpcResult<()> {
Ok(())
}
}
impl RequestServiceInterface for RequestServiceProxy {}
@ -252,6 +285,26 @@ impl RequestServiceInterface for RequestService {
fn get_task(&self, data: &BorrowedMsgParcel, reply: &mut BorrowedMsgParcel) -> IpcResult<()> {
command::GetTask::execute(data, reply)
}
fn open_channel(
&self,
data: &BorrowedMsgParcel,
reply: &mut BorrowedMsgParcel,
) -> IpcResult<()> {
command::OpenChannel::execute(data, reply)
}
fn subscribe(&self, data: &BorrowedMsgParcel, reply: &mut BorrowedMsgParcel) -> IpcResult<()> {
command::Subscribe::execute(data, reply)
}
fn unsubscribe(
&self,
data: &BorrowedMsgParcel,
reply: &mut BorrowedMsgParcel,
) -> IpcResult<()> {
command::Unsubscribe::execute(data, reply)
}
}
pub(crate) fn serialize_task_info(tf: TaskInfo, reply: &mut BorrowedMsgParcel) -> IpcResult<()> {

View File

@ -147,18 +147,15 @@ async fn download_inner(task: Arc<RequestTask>) {
return;
}
};
task.record_response_header(&response);
if !task.handle_response_error(&response).await {
error!("response error");
return;
}
let response = response.unwrap();
if !task.get_file_info(&response) {
return;
}
let mut downloader = build_downloader(task.clone(), response);
let result = downloader.download().await;

View File

@ -20,7 +20,7 @@ use ylong_http_client::HttpClientError;
use ylong_runtime::io::AsyncWrite;
#[cfg(feature = "oh")]
use crate::manager::Notifier;
use crate::manage::Notifier;
use crate::task::config::Version;
use crate::task::info::State;
use crate::task::RequestTask;

View File

@ -35,8 +35,9 @@ use super::info::{CommonTaskInfo, Mode, State, TaskInfo, UpdateInfo};
use super::notify::{EachFileStatus, NotifyData, Progress};
use super::reason::Reason;
use super::upload::upload;
use crate::manager::monitor::IsOnline;
use crate::manager::SystemProxyManager;
use crate::manage::monitor::IsOnline;
use crate::manage::SystemProxyManager;
use crate::service::ability::RequestAbility;
use crate::task::config::{Action, TaskConfig};
use crate::task::ffi::{
GetNetworkInfo, RequestBackgroundNotify, RequestTaskMsg, UpdateRequestTask,
@ -46,7 +47,7 @@ use crate::utils::{get_current_timestamp, hashmap_to_string};
cfg_oh! {
use crate::service::{open_file_readonly, open_file_readwrite, convert_path};
use crate::manager::Notifier;
use crate::manage::Notifier;
}
const SECONDS_IN_ONE_WEEK: u64 = 7 * 24 * 60 * 60;
@ -670,8 +671,31 @@ impl RequestTask {
}
}
pub(crate) fn notify_response(&self, response: &Response) {
let tid = self.conf.common_data.task_id;
let version: String = response.version().as_str().into();
let status_code: u32 = response.status().as_u16() as u32;
let status_message: String;
if let Some(reason) = response.status().reason() {
status_message = reason.into();
} else {
error!("bad status_message {:?}", status_code);
return;
}
let headers = response.headers().clone();
debug!("notify_response");
RequestAbility::client_manager().send_response(
tid,
version,
status_code,
status_message,
headers,
)
}
pub(crate) fn record_response_header(&self, response: &Result<Response, HttpClientError>) {
if let Ok(r) = response {
self.notify_response(r);
let mut guard = self.progress.lock().unwrap();
guard.extras.clear();
for (k, v) in r.headers() {

View File

@ -1283,5 +1283,152 @@ export default function requestOperateTaskTest() {
})
await task.start();
})
/**
* @tc.number: testOnResponse001
* @tc.name: testOnResponse001
* @tc.desc: Test on response
* @tc.size: MediumTest
* @tc.type: Function
* @tc.level: Level 1
* @tc.require:
*/
it('testOnResponse001', 0, async function (done) {
let conf: request.agent.Config = {
action: request.agent.Action.DOWNLOAD,
url: 'https://gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt',
saveas: `test.txt`,
mode: request.agent.Mode.FOREGROUND,
overwrite: true,
}
let task = await request.agent.create(context, conf);
task.on('response', (response: request.agent.HttpResponse) => {
expect(response.statusCode).assertEqual(200);
expect(response.version).assertEqual("HTTP/1.1");
expect(response.reason).assertEqual("OK");
done();
})
await task.start();
})
/**
* @tc.number: testOnResponse002
* @tc.name: testOnResponse002
* @tc.desc: Test off response
* @tc.size: MediumTest
* @tc.type: Function
* @tc.level: Level 1
* @tc.require:
*/
it('testOnResponse002', 0, async function (done) {
let conf = {
action: request.agent.Action.DOWNLOAD,
url: 'https://gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt',
saveas: `test.txt`,
mode: request.agent.Mode.FOREGROUND,
overwrite: true,
}
let callback = (response: request.agent.HttpResponse) => {
expect(false).assertTrue();
done();
};
let task = await request.agent.create(context, conf);
task.on('response', callback)
task.off('response', callback);
task.on('completed', function (progress) {
expect(progress.state).assertEqual(request.agent.State.COMPLETED);
done();
})
await task.start();
})
/**
* @tc.number: testOnResponse003
* @tc.name: testOnResponse003
* @tc.desc: Test off all response
* @tc.size: MediumTest
* @tc.type: Function
* @tc.level: Level 1
* @tc.require:
*/
it('testOnResponse003', 0, async function (done) {
let conf = {
action: request.agent.Action.DOWNLOAD,
url: 'https://gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt',
saveas: `test.txt`,
mode: request.agent.Mode.FOREGROUND,
overwrite: true,
}
let task = await request.agent.create(context, conf);
task.on('response', function (response: request.agent.HttpResponse) {
expect(false).assertTrue();
done();
})
task.on('response', function (response: request.agent.HttpResponse) {
expect(response.statusCode).assertEqual(200);
expect(false).assertTrue();
done();
})
task.off('response');
task.on('completed', function (progress) {
expect(progress.state).assertEqual(request.agent.State.COMPLETED);
done();
})
await task.start();
})
/**
* @tc.number: testOnResponse004
* @tc.name: testOnResponse004
* @tc.desc: Test off with no response
* @tc.size: MediumTest
* @tc.type: Function
* @tc.level: Level 1
* @tc.require:
*/
it('testOnResponse004', 0, async function (done) {
let conf = {
action: request.agent.Action.DOWNLOAD,
url: 'https://gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt',
saveas: `test.txt`,
mode: request.agent.Mode.FOREGROUND,
overwrite: true,
}
let task = await request.agent.create(context, conf);
task.off('response');
task.on('completed', function (progress) {
expect(progress.state).assertEqual(request.agent.State.COMPLETED);
done();
})
await task.start();
})
/**
* @tc.number: testOnResponse005
* @tc.name: testOnResponse005
* @tc.desc: Test on response with a bad request
* @tc.size: MediumTest
* @tc.type: Function
* @tc.level: Level 1
* @tc.require:
*/
it('testOnResponse005', 0, async function (done) {
let conf = {
action: request.agent.Action.DOWNLOAD,
url: 'https://gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.jpg',
saveas: `test.txt`,
mode: request.agent.Mode.FOREGROUND,
overwrite: true,
}
let task = await request.agent.create(context, conf);
task.on('response', function (response: request.agent.HttpResponse) {
expect(response.statusCode).assertEqual(404);
expect(response.version).assertEqual("HTTP/1.1");
expect(response.reason).assertEqual("Not Found");
task.off('response');
done();
})
await task.start();
})
})
}