diff --git a/frameworks/js/napi/BUILD.gn b/frameworks/js/napi/BUILD.gn index 41d6be10..8512c726 100644 --- a/frameworks/js/napi/BUILD.gn +++ b/frameworks/js/napi/BUILD.gn @@ -48,6 +48,7 @@ ohos_shared_library("request") { "src/app_state_callback.cpp", "src/async_call.cpp", "src/js_initialize.cpp", + "src/js_response_listener.cpp", "src/js_task.cpp", "src/legacy/download_task.cpp", "src/legacy/request_manager.cpp", @@ -106,6 +107,7 @@ ohos_static_library("request_static") { "src/app_state_callback.cpp", "src/async_call.cpp", "src/js_initialize.cpp", + "src/js_response_listener.cpp", "src/js_task.cpp", "src/legacy/download_task.cpp", "src/legacy/request_manager.cpp", diff --git a/frameworks/js/napi/include/js_common.h b/frameworks/js/napi/include/js_common.h index 554b694c..327d35a2 100644 --- a/frameworks/js/napi/include/js_common.h +++ b/frameworks/js/napi/include/js_common.h @@ -263,5 +263,13 @@ struct DownloadInfo { std::string description; int64_t downloadedBytes; }; + +struct Response { + std::string taskId; + std::string version; + int32_t statusCode; + std::string reason; + std::map> headers; +}; } // namespace OHOS::Request #endif //JS_COMMON_H \ No newline at end of file diff --git a/frameworks/js/napi/include/js_response_listener.h b/frameworks/js/napi/include/js_response_listener.h new file mode 100644 index 00000000..c2af4e0e --- /dev/null +++ b/frameworks/js/napi/include/js_response_listener.h @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_REQUEST_JS_RESPONSE_LISTENER_H +#define OHOS_REQUEST_JS_RESPONSE_LISTENER_H + +#include +#include + +#include "i_response_listener.h" +#include "napi/native_api.h" +#include "napi_utils.h" + +namespace OHOS::Request { +class JSResponseListener + : public IResponseListener + , public std::enable_shared_from_this { +public: + JSResponseListener(napi_env env, const std::string &taskId) : env_(env), taskId_(taskId) + { + } + napi_status AddListener(napi_value cb); + napi_status RemoveListener(napi_value cb = nullptr); + void OnResponseReceive(const std::shared_ptr &response) override; + bool HasListener(); + +private: + bool IsListenerAdded(napi_value cb); + +private: + const napi_env env_; + const std::string taskId_; + std::list allCb_; +}; + +} // namespace OHOS::Request + +#endif // OHOS_REQUEST_JS_RESPONSE_LISTENER_H \ No newline at end of file diff --git a/frameworks/js/napi/include/js_task.h b/frameworks/js/napi/include/js_task.h index b4fee0f0..9650fb86 100644 --- a/frameworks/js/napi/include/js_task.h +++ b/frameworks/js/napi/include/js_task.h @@ -18,6 +18,7 @@ #include "async_call.h" #include "js_common.h" +#include "js_response_listener.h" #include "request_notify.h" namespace OHOS::Request { @@ -61,6 +62,7 @@ public: static std::map pathMap_; std::mutex listenerMutex_; std::map>> listenerMap_; + std::shared_ptr responseListener_; private: struct ContextInfo : public AsyncCall::Context { diff --git a/frameworks/js/napi/include/napi_utils.h b/frameworks/js/napi/include/napi_utils.h index 1d4c2c24..a4cd5376 100644 --- a/frameworks/js/napi/include/napi_utils.h +++ b/frameworks/js/napi/include/napi_utils.h @@ -62,7 +62,9 @@ napi_value Convert2JSValue(napi_env env, const std::vector &taskState napi_value Convert2JSValue(napi_env env, const Progress &progress); napi_value Convert2JSValue(napi_env env, TaskInfo &taskInfo); napi_value Convert2JSValue(napi_env env, Config &config); +napi_value Convert2JSValue(napi_env env, const std::shared_ptr &response); napi_value Convert2JSValue(napi_env env, const std::vector &files, const std::vector &forms); +napi_value Convert2JSHeaders(napi_env env, const std::map> &header); napi_value Convert2JSHeadersAndBody(napi_env env, const std::map &header, const std::vector &bodyBytes, bool isSeparate); diff --git a/frameworks/js/napi/src/js_response_listener.cpp b/frameworks/js/napi/src/js_response_listener.cpp new file mode 100644 index 00000000..35ec575a --- /dev/null +++ b/frameworks/js/napi/src/js_response_listener.cpp @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "js_response_listener.h" + +#include "request_manager.h" + +namespace OHOS::Request { + +napi_status JSResponseListener::AddListener(napi_value cb) +{ + if (this->IsListenerAdded(cb)) { + return napi_ok; + } + + napi_ref ref; + napi_status status = napi_create_reference(env_, cb, 1, &ref); + if (status != napi_ok) { + return status; + } + + this->allCb_.push_back(ref); + if (this->allCb_.size() == 1) { + RequestManager::GetInstance()->Subscribe(this->taskId_, shared_from_this()); + } + + return napi_ok; +} + +napi_status JSResponseListener::RemoveListener(napi_value cb) +{ + if (this->allCb_.empty()) { + return napi_ok; + } + + if (cb == nullptr) { + RequestManager::GetInstance()->Unsubscribe(this->taskId_, shared_from_this()); + while (!this->allCb_.empty()) { + napi_ref ref = this->allCb_.front(); + napi_delete_reference(this->env_, ref); + this->allCb_.pop_front(); + } + return napi_ok; + } + + for (auto it = this->allCb_.begin(); it != this->allCb_.end(); it++) { + napi_value copyValue = nullptr; + napi_get_reference_value(this->env_, *it, ©Value); + + bool isEquals = false; + napi_strict_equals(this->env_, cb, copyValue, &isEquals); + if (isEquals) { + napi_delete_reference(this->env_, *it); + this->allCb_.erase(it); + break; + } + } + + if (this->allCb_.empty()) { + RequestManager::GetInstance()->Unsubscribe(this->taskId_, shared_from_this()); + } + + return napi_ok; +} + +void JSResponseListener::OnResponseReceive(const std::shared_ptr &response) +{ + napi_value value = NapiUtils::Convert2JSValue(this->env_, response); + for (auto it = this->allCb_.begin(); it != this->allCb_.end(); it++) { + napi_handle_scope scope = nullptr; + napi_open_handle_scope(this->env_, &scope); + napi_value callbackFunc = nullptr; + napi_get_reference_value(this->env_, *it, &callbackFunc); + + napi_value callbackResult = nullptr; + uint32_t paramNumber = 1; + napi_call_function(this->env_, nullptr, callbackFunc, paramNumber, &value, &callbackResult); + napi_close_handle_scope(this->env_, scope); + } +} + +bool JSResponseListener::IsListenerAdded(napi_value cb) +{ + if (cb == nullptr) { + return true; + } + for (auto it = this->allCb_.begin(); it != this->allCb_.end(); it++) { + napi_value copyValue = nullptr; + napi_get_reference_value(this->env_, *it, ©Value); + + bool isEquals = false; + napi_strict_equals(this->env_, cb, copyValue, &isEquals); + if (isEquals) { + return true; + } + } + + return false; +} + +bool JSResponseListener::HasListener() +{ + return !this->allCb_.empty(); +} +} // namespace OHOS::Request \ No newline at end of file diff --git a/frameworks/js/napi/src/js_task.cpp b/frameworks/js/napi/src/js_task.cpp index 14fccaf6..c09773ef 100644 --- a/frameworks/js/napi/src/js_task.cpp +++ b/frameworks/js/napi/src/js_task.cpp @@ -150,6 +150,8 @@ napi_value JsTask::JsMain(napi_env env, napi_callback_info info, Version version } napi_status status = napi_get_reference_value(context->env_, context->taskRef, result); context->task->SetTid(context->tid); + context->task->responseListener_ = + std::make_shared(context->env_, std::to_string(context->tid)); JsTask::AddTaskMap(std::to_string(context->tid), context->task); JsTask::AddTaskContextMap(std::to_string(context->tid), context); napi_value config = nullptr; @@ -366,6 +368,7 @@ bool JsTask::GetTaskOutput(std::shared_ptr context) } napi_unwrap(context->env_, jsTask, reinterpret_cast(&context->task)); napi_create_reference(context->env_, jsTask, 1, &(context->taskRef)); + context->task->responseListener_ = std::make_shared(context->env_, tid); JsTask::AddTaskMap(tid, context->task); JsTask::AddTaskContextMap(tid, context); } @@ -834,8 +837,13 @@ void JsTask::ReloadListener() { REQUEST_HILOGD("ReloadListener in"); std::lock_guard lockGuard(JsTask::taskMutex_); + RequestManager::GetInstance()->ReopenChannel(); for (const auto &it : taskMap_) { std::string tid = it.first; + if (it.second->responseListener_->HasListener()) { + RequestManager::GetInstance()->Unsubscribe(tid, it.second->responseListener_); + RequestManager::GetInstance()->Subscribe(tid, it.second->responseListener_); + } for (auto itListener : it.second->listenerMap_) { std::string key = itListener.first; if (key.find(tid) == std::string::npos) { diff --git a/frameworks/js/napi/src/napi_utils.cpp b/frameworks/js/napi/src/napi_utils.cpp index 50b5b0cd..36f0ed9b 100644 --- a/frameworks/js/napi/src/napi_utils.cpp +++ b/frameworks/js/napi/src/napi_utils.cpp @@ -427,6 +427,63 @@ napi_value Convert2JSValue(napi_env env, Config &config) return value; } +napi_value Convert2JSValue(napi_env env, const std::shared_ptr &response) +{ + napi_value value = nullptr; + napi_create_object(env, &value); + napi_set_named_property(env, value, "version", Convert2JSValue(env, response->version)); + napi_set_named_property(env, value, "statusCode", Convert2JSValue(env, response->statusCode)); + napi_set_named_property(env, value, "reason", Convert2JSValue(env, response->reason)); + napi_set_named_property(env, value, "headers", Convert2JSHeaders(env, response->headers)); + return value; +} + +napi_value Convert2JSHeaders(napi_env env, const std::map> &headers) +{ + napi_value value = nullptr; + napi_value value2 = nullptr; + napi_value global = nullptr; + napi_value mapConstructor = nullptr; + napi_value mapSet = nullptr; + const uint32_t paramNumber = 2; + napi_value args[paramNumber] = { 0 }; + + napi_status status = napi_get_global(env, &global); + if (status != napi_ok) { + REQUEST_HILOGE("response napi_get_global failed"); + return nullptr; + } + + status = napi_get_named_property(env, global, "Map", &mapConstructor); + if (status != napi_ok) { + REQUEST_HILOGE("response map failed"); + return nullptr; + } + + status = napi_new_instance(env, mapConstructor, 0, nullptr, &value); + if (status != napi_ok) { + REQUEST_HILOGE("response napi_new_instance failed"); + return nullptr; + } + + status = napi_get_named_property(env, value, "set", &mapSet); + if (status != napi_ok) { + REQUEST_HILOGE("response set failed"); + return nullptr; + } + + for (const auto &it : headers) { + args[0] = Convert2JSValue(env, it.first); + args[1] = Convert2JSValue(env, it.second); + status = napi_call_function(env, value, mapSet, paramNumber, args, &value2); + if (status != napi_ok) { + REQUEST_HILOGE("response napi_call_function failed, %{public}d", status); + return nullptr; + } + } + return value; +} + std::string GetSaveas(const std::vector &files, Action action) { if (action == Action::UPLOAD) { diff --git a/frameworks/js/napi/src/notify_stub.cpp b/frameworks/js/napi/src/notify_stub.cpp index b2523517..84533336 100644 --- a/frameworks/js/napi/src/notify_stub.cpp +++ b/frameworks/js/napi/src/notify_stub.cpp @@ -17,9 +17,9 @@ #include +#include "download_server_ipc_interface_code.h" #include "log.h" #include "parcel_helper.h" -#include "download_server_ipc_interface_code.h" #include "request_event.h" namespace OHOS::Request { diff --git a/frameworks/js/napi/src/request_event.cpp b/frameworks/js/napi/src/request_event.cpp index 7eac4441..ca1c6017 100644 --- a/frameworks/js/napi/src/request_event.cpp +++ b/frameworks/js/napi/src/request_event.cpp @@ -29,6 +29,7 @@ static constexpr const char *EVENT_PROGRESS = "progress"; static constexpr const char *EVENT_HEADERRECEIVE = "headerReceive"; static constexpr const char *EVENT_FAIL = "fail"; static constexpr const char *EVENT_COMPLETE = "complete"; +static constexpr const char *EVENT_RESPONSE = "response"; std::unordered_set RequestEvent::supportEventsV9_ = { EVENT_COMPLETE, @@ -46,6 +47,7 @@ std::unordered_set RequestEvent::supportEventsV10_ = { EVENT_PAUSE, EVENT_RESUME, EVENT_REMOVE, + EVENT_RESPONSE, }; std::map RequestEvent::requestEvent_ = { @@ -140,6 +142,16 @@ napi_value RequestEvent::On(napi_env env, napi_callback_info info) return nullptr; } + /* on response */ + if (jsParam.type.compare(EVENT_RESPONSE) == 0) { + napi_status ret = jsParam.task->responseListener_->AddListener(jsParam.callback); + if (ret != napi_ok) { + REQUEST_HILOGE("AddListener fail"); + } + REQUEST_HILOGD("On event %{public}s + %{public}s", jsParam.type.c_str(), jsParam.task->GetTid().c_str()); + return nullptr; + } + sptr listener = new (std::nothrow) RequestNotify(env, jsParam.callback); if (listener == nullptr) { REQUEST_HILOGE("Create callback object fail"); @@ -165,6 +177,15 @@ napi_value RequestEvent::Off(napi_env env, napi_callback_info info) return nullptr; } + /* off response */ + if (jsParam.type.compare(EVENT_RESPONSE) == 0) { + napi_status ret = jsParam.task->responseListener_->RemoveListener(jsParam.callback); + if (ret != napi_ok) { + REQUEST_HILOGE("RemoveListener fail"); + } + return nullptr; + } + if (jsParam.callback == nullptr) { jsParam.task->RemoveListener(jsParam.type, jsParam.task->GetTid(), jsParam.task->config_.version); } else { diff --git a/frameworks/native/BUILD.gn b/frameworks/native/BUILD.gn index 8647db2a..54eac38e 100644 --- a/frameworks/native/BUILD.gn +++ b/frameworks/native/BUILD.gn @@ -48,8 +48,10 @@ ohos_shared_library("request_native") { sources = [ "src/parcel_helper.cpp", "src/request_manager.cpp", + "src/request_manager_impl.cpp", "src/request_service_proxy.cpp", "src/request_sync_load_callback.cpp", + "src/response_message_receiver.cpp", ] deps = [ "../../common:request_common_static" ] @@ -59,6 +61,7 @@ ohos_shared_library("request_native") { "ability_runtime:data_ability_helper", "ability_runtime:napi_base_context", "c_utils:utils", + "eventhandler:libeventhandler", "hilog:libhilog", "ipc:ipc_single", "relational_store:native_dataability", diff --git a/frameworks/native/include/download_server_ipc_interface_code.h b/frameworks/native/include/download_server_ipc_interface_code.h index 7303a73b..0a525f34 100644 --- a/frameworks/native/include/download_server_ipc_interface_code.h +++ b/frameworks/native/include/download_server_ipc_interface_code.h @@ -35,6 +35,9 @@ enum class RequestInterfaceCode { CMD_SEARCH, CMD_GETTASK, CMD_CLEAR, + CMD_OPENCHANNEL, + CMD_SUBSCRIBE, + CMD_UNSUBSCRIBE, }; enum class RequestNotifyInterfaceCode { diff --git a/frameworks/native/include/i_response_listener.h b/frameworks/native/include/i_response_listener.h new file mode 100644 index 00000000..b9625c8e --- /dev/null +++ b/frameworks/native/include/i_response_listener.h @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_REQUEST_I_RESPONSE_LISTENER_H +#define OHOS_REQUEST_I_RESPONSE_LISTENER_H + +#include "js_common.h" + +namespace OHOS::Request { + +class IResponseListener { +public: + virtual ~IResponseListener() = default; + virtual void OnResponseReceive(const std::shared_ptr &response) = 0; +}; + +} // namespace OHOS::Request + +#endif // OHOS_REQUEST_I_RESPONSE_LISTENER_H \ No newline at end of file diff --git a/frameworks/native/include/i_response_message_handler.h b/frameworks/native/include/i_response_message_handler.h new file mode 100644 index 00000000..12173ef1 --- /dev/null +++ b/frameworks/native/include/i_response_message_handler.h @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_REQUEST_I_RESPONSE_MESSAGE_HANDLER_H +#define OHOS_REQUEST_I_RESPONSE_MESSAGE_HANDLER_H + +#include "i_response_listener.h" + +namespace OHOS::Request { + +class IResponseMessageHandler : public IResponseListener { +public: + virtual void OnChannelBroken() = 0; +}; + +} // namespace OHOS::Request + +#endif // OHOS_REQUEST_I_RESPONSE_MESSAGE_HANDLER_H \ No newline at end of file diff --git a/frameworks/native/include/request.h b/frameworks/native/include/request.h new file mode 100644 index 00000000..785d88c8 --- /dev/null +++ b/frameworks/native/include/request.h @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_REQUEST_REQUEST_H +#define OHOS_REQUEST_REQUEST_H + +#include + +#include "i_response_listener.h" + +namespace OHOS::Request { + +class Request { +public: + static constexpr uint32_t EVENT_NONE = 0; + static constexpr uint32_t EVENT_RESPONSE = (1 << 0); + +public: + Request(const std::string &taskId) : taskId_(taskId), events_(0U) + { + } + + const std::string &getId() const + { + return this->taskId_; + } + + size_t AddListener(const std::shared_ptr &listener) + { + this->responseListeners_.emplace(listener); + return this->responseListeners_.size(); + } + + size_t RemoveListener(const std::shared_ptr &listener) + { + this->responseListeners_.erase(listener); + return this->responseListeners_.size(); + } + + bool IsEventSubscribed(uint32_t eventType) + { + return ((events_ & eventType) == eventType); + } + + void MarkEventSubscribed(uint32_t eventType, bool subscribed) + { + if (subscribed) { + events_ |= eventType; + } else { + events_ &= (~eventType); + } + } + + bool HasListener() const + { + return !(this->responseListeners_.empty()); + } + + void OnResponseReceive(const std::shared_ptr &response) + { + for (auto responseListener : responseListeners_) { + responseListener->OnResponseReceive(response); + } + } + +private: + const std::string taskId_; + uint32_t events_; + std::set> responseListeners_; +}; + +} // namespace OHOS::Request + +#endif // OHOS_REQUEST_REQUEST_H \ No newline at end of file diff --git a/frameworks/native/include/request_manager.h b/frameworks/native/include/request_manager.h index 171cb76f..b3b3e48d 100644 --- a/frameworks/native/include/request_manager.h +++ b/frameworks/native/include/request_manager.h @@ -13,38 +13,19 @@ * limitations under the License. */ -#ifndef DOWNLOAD_MANAGER_H -#define DOWNLOAD_MANAGER_H +#ifndef OHOS_REQUEST_DOWNLOAD_MANAGER_H +#define OHOS_REQUEST_DOWNLOAD_MANAGER_H -#include -#include -#include -#include - -#include "constant.h" -#include "data_ability_helper.h" -#include "iremote_object.h" -#include "iservice_registry.h" +#include "i_response_listener.h" #include "js_common.h" #include "notify_stub.h" -#include "refbase.h" -#include "request_service_interface.h" -#include "system_ability_status_change_stub.h" #include "visibility.h" namespace OHOS::Request { -class RequestSaDeathRecipient : public IRemoteObject::DeathRecipient { -public: - explicit RequestSaDeathRecipient(); - ~RequestSaDeathRecipient() = default; - void OnRemoteDied(const wptr &object) override; -}; -class RequestManager : public RefBase { +class RequestManager { public: - RequestManager(); - ~RequestManager(); - REQUEST_API static sptr GetInstance(); + REQUEST_API static const std::unique_ptr &GetInstance(); REQUEST_API int32_t Create(const Config &config, int32_t &tid, sptr listener); REQUEST_API int32_t GetTask(const std::string &tid, const std::string &token, Config &config); REQUEST_API int32_t Start(const std::string &tid); @@ -62,43 +43,20 @@ public: const std::string &type, const std::string &tid, const sptr &listener, Version version); REQUEST_API int32_t Off(const std::string &type, const std::string &tid, Version version); + REQUEST_API int32_t Subscribe(const std::string &taskId, const std::shared_ptr &listener); + REQUEST_API int32_t Unsubscribe(const std::string &taskId, const std::shared_ptr &listener); + REQUEST_API void RestoreListener(void (*callback)()); - void OnRemoteSaDied(const wptr &object); REQUEST_API bool LoadRequestServer(); - REQUEST_API bool IsSaReady(); - void LoadServerSuccess(); - void LoadServerFail(); + REQUEST_API void ReopenChannel(); private: - sptr GetRequestServiceProxy(); - int32_t Retry(int32_t &taskId, const Config &config, int32_t errorCode, sptr listener); - void SetRequestServiceProxy(sptr proxy); - bool SubscribeSA(sptr systemAbilityManager); - -private: - static std::mutex instanceLock_; - static sptr instance_; - std::mutex downloadMutex_; - std::mutex conditionMutex_; - std::mutex serviceProxyMutex_; - - sptr requestServiceProxy_; - sptr deathRecipient_; - sptr saChangeListener_; - std::condition_variable syncCon_; - std::atomic ready_ = false; - static constexpr int LOAD_SA_TIMEOUT_MS = 15000; - void (*callback_)() = nullptr; - -private: - class SystemAbilityStatusChangeListener : public OHOS::SystemAbilityStatusChangeStub { - public: - SystemAbilityStatusChangeListener(); - ~SystemAbilityStatusChangeListener() = default; - virtual void OnAddSystemAbility(int32_t saId, const std::string &deviceId) override; - virtual void OnRemoveSystemAbility(int32_t asId, const std::string &deviceId) override; - }; + RequestManager() = default; + RequestManager(const RequestManager &) = delete; + RequestManager(RequestManager &&) = delete; + RequestManager &operator=(const RequestManager &) = delete; }; + } // namespace OHOS::Request -#endif // DOWNLOAD_MANAGER_H +#endif // OHOS_REQUEST_DOWNLOAD_MANAGER_H diff --git a/frameworks/native/include/request_manager_impl.h b/frameworks/native/include/request_manager_impl.h new file mode 100644 index 00000000..7e5a7c6d --- /dev/null +++ b/frameworks/native/include/request_manager_impl.h @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_REQUEST_DOWNLOAD_MANAGER_IMPL_H +#define OHOS_REQUEST_DOWNLOAD_MANAGER_IMPL_H + +#include +#include +#include +#include + +#include "constant.h" +#include "data_ability_helper.h" +#include "i_response_message_handler.h" +#include "iremote_object.h" +#include "iservice_registry.h" +#include "js_common.h" +#include "notify_stub.h" +#include "refbase.h" +#include "request.h" +#include "request_service_interface.h" +#include "response_message_receiver.h" +#include "system_ability_status_change_stub.h" +#include "visibility.h" + +namespace OHOS::Request { +class RequestSaDeathRecipient : public IRemoteObject::DeathRecipient { +public: + explicit RequestSaDeathRecipient(); + ~RequestSaDeathRecipient() = default; + void OnRemoteDied(const wptr &object) override; +}; + +class RequestManagerImpl : public IResponseMessageHandler { +public: + static const std::unique_ptr &GetInstance(); + int32_t Create(const Config &config, int32_t &tid, sptr listener); + int32_t GetTask(const std::string &tid, const std::string &token, Config &config); + int32_t Start(const std::string &tid); + int32_t Stop(const std::string &tid); + int32_t Query(const std::string &tid, TaskInfo &info); + int32_t Touch(const std::string &tid, const std::string &token, TaskInfo &info); + int32_t Search(const Filter &filter, std::vector &tids); + int32_t Show(const std::string &tid, TaskInfo &info); + int32_t Pause(const std::string &tid, Version version); + int32_t QueryMimeType(const std::string &tid, std::string &mimeType); + int32_t Remove(const std::string &tid, Version version); + int32_t Resume(const std::string &tid); + + int32_t On(const std::string &type, const std::string &tid, const sptr &listener, Version version); + int32_t Off(const std::string &type, const std::string &tid, Version version); + + int32_t Subscribe(const std::string &taskId, const std::shared_ptr &listener); + int32_t Unsubscribe(const std::string &taskId, const std::shared_ptr &listener); + + void RestoreListener(void (*callback)()); + bool LoadRequestServer(); + bool IsSaReady(); + void OnRemoteSaDied(const wptr &object); + void LoadServerSuccess(); + void LoadServerFail(); + void ReopenChannel(); + +private: + RequestManagerImpl() = default; + RequestManagerImpl(const RequestManagerImpl &) = delete; + RequestManagerImpl(RequestManagerImpl &&) = delete; + RequestManagerImpl &operator=(const RequestManagerImpl &) = delete; + sptr GetRequestServiceProxy(); + int32_t Retry(int32_t &taskId, const Config &config, int32_t errorCode, sptr listener); + void SetRequestServiceProxy(sptr proxy); + bool SubscribeSA(sptr systemAbilityManager); + int32_t EnsureChannelOpen(); + std::shared_ptr GetTask(const std::string &taskId); + void OnChannelBroken() override; + void OnResponseReceive(const std::shared_ptr &response) override; + +private: + static std::mutex instanceLock_; + static sptr instance_; + std::mutex downloadMutex_; + std::mutex conditionMutex_; + std::mutex serviceProxyMutex_; + + sptr requestServiceProxy_; + sptr deathRecipient_; + sptr saChangeListener_; + std::condition_variable syncCon_; + std::atomic ready_ = false; + static constexpr int LOAD_SA_TIMEOUT_MS = 15000; + void (*callback_)() = nullptr; + std::map> tasks_; + std::shared_ptr msgReceiver_; + +private: + class SystemAbilityStatusChangeListener : public OHOS::SystemAbilityStatusChangeStub { + public: + SystemAbilityStatusChangeListener(); + ~SystemAbilityStatusChangeListener() = default; + virtual void OnAddSystemAbility(int32_t saId, const std::string &deviceId) override; + virtual void OnRemoveSystemAbility(int32_t asId, const std::string &deviceId) override; + }; +}; + +} // namespace OHOS::Request +#endif // OHOS_REQUEST_DOWNLOAD_MANAGER_IMPL_H diff --git a/frameworks/native/include/request_service_interface.h b/frameworks/native/include/request_service_interface.h index c1456fd4..a1e0e772 100644 --- a/frameworks/native/include/request_service_interface.h +++ b/frameworks/native/include/request_service_interface.h @@ -44,6 +44,10 @@ public: virtual int32_t On( const std::string &type, const std::string &tid, const sptr &listener, Version version) = 0; virtual int32_t Off(const std::string &type, const std::string &tid, Version version) = 0; + + virtual int32_t OpenChannel(int32_t &sockFd) = 0; + virtual int32_t Subscribe(const std::string &taskId, int32_t cbType) = 0; + virtual int32_t Unsubscribe(const std::string &taskId, int32_t cbType) = 0; }; } // namespace OHOS::Request #endif // DOWNLOAD_SERVICE_INTERFACE_H \ No newline at end of file diff --git a/frameworks/native/include/request_service_proxy.h b/frameworks/native/include/request_service_proxy.h index c8d78589..a4066685 100644 --- a/frameworks/native/include/request_service_proxy.h +++ b/frameworks/native/include/request_service_proxy.h @@ -45,6 +45,10 @@ public: Version version) override; int32_t Off(const std::string &type, const std::string &tid, Version version) override; + int32_t OpenChannel(int32_t &sockFd) override; + int32_t Subscribe(const std::string &taskId, int32_t cbType) override; + int32_t Unsubscribe(const std::string &taskId, int32_t cbType) override; + private: static void GetVectorData(const Config &config, MessageParcel &data); static inline BrokerDelegator delegator_; diff --git a/frameworks/native/include/response_message_receiver.h b/frameworks/native/include/response_message_receiver.h new file mode 100644 index 00000000..de86721a --- /dev/null +++ b/frameworks/native/include/response_message_receiver.h @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_REQUEST_RESPONSE_MESSAGE_RECEIVER_H +#define OHOS_REQUEST_RESPONSE_MESSAGE_RECEIVER_H + +#include "event_handler.h" +#include "event_runner.h" +#include "i_response_message_handler.h" + +namespace OHOS::Request { + +enum MessageType { + HTTP_RESPONSE = 0, +}; + +class ResponseMessageReceiver + : public OHOS::AppExecFwk::FileDescriptorListener + , public std::enable_shared_from_this { +public: + static constexpr uint32_t RESPONSE_MAX_SIZE = 8 * 1024; + static constexpr uint32_t RESPONSE_MAGIC_NUM = 0x43434646; + + ResponseMessageReceiver(IResponseMessageHandler *handler, int32_t sockFd); + void BeginReceive(); + void Shutdown(void); + +private: + void OnReadable(int32_t fd) override; + void OnShutdown(int32_t fd) override; + void OnException(int32_t fd) override; + +private: + IResponseMessageHandler *handler_; + int32_t sockFd_{ -1 }; + int32_t messageId_{ 1 }; +}; + +} // namespace OHOS::Request + +#endif // OHOS_REQUEST_RESPONSE_MESSAGE_RECEIVER_H \ No newline at end of file diff --git a/frameworks/native/src/request_manager.cpp b/frameworks/native/src/request_manager.cpp index a28de0f4..bd11ffeb 100644 --- a/frameworks/native/src/request_manager.cpp +++ b/frameworks/native/src/request_manager.cpp @@ -15,402 +15,112 @@ #include "request_manager.h" -#include - -#include "data_ability_predicates.h" -#include "log.h" -#include "rdb_errno.h" -#include "rdb_helper.h" -#include "rdb_open_callback.h" -#include "rdb_predicates.h" -#include "rdb_store.h" -#include "request_sync_load_callback.h" -#include "result_set.h" -#include "system_ability_definition.h" +#include "request_manager_impl.h" namespace OHOS::Request { -std::mutex RequestManager::instanceLock_; -sptr RequestManager::instance_ = nullptr; -constexpr const int32_t RETRY_INTERVAL = 500 * 1000; -constexpr const int32_t RETRY_MAX_TIMES = 5; -RequestManager::RequestManager() : requestServiceProxy_(nullptr), deathRecipient_(nullptr), saChangeListener_(nullptr) +const std::unique_ptr &RequestManager::GetInstance() { -} - -RequestManager::~RequestManager() -{ -} - -sptr RequestManager::GetInstance() -{ - if (instance_ == nullptr) { - std::lock_guard autoLock(instanceLock_); - if (instance_ == nullptr) { - instance_ = new RequestManager; - } - } - return instance_; + static std::unique_ptr instance(new RequestManager()); + return instance; } int32_t RequestManager::Create(const Config &config, int32_t &tid, sptr listener) { - REQUEST_HILOGD("RequestManager Create start."); - - auto proxy = GetRequestServiceProxy(); - if (proxy == nullptr) { - REQUEST_HILOGE("GetRequestServiceProxy fail."); - return E_SERVICE_ERROR; - } - int32_t ret = proxy->Create(config, tid, listener); - if (ret == E_UNLOADING_SA) { - REQUEST_HILOGE("Service ability is quitting"); - return Retry(tid, config, ret, listener); - } - REQUEST_HILOGD("RequestManager Create end."); - return ret; + return RequestManagerImpl::GetInstance()->Create(config, tid, listener); } - -int32_t RequestManager::Retry(int32_t &taskId, const Config &config, int32_t errorCode, sptr listener) -{ - REQUEST_HILOGD("Retry in"); - int32_t interval = 1; - while (errorCode == E_UNLOADING_SA && interval <= RETRY_MAX_TIMES) { - if (config.action == Action::DOWNLOAD) { - for (auto file : config.files) { - std::remove(file.uri.c_str()); - } - } - - if (errorCode == E_UNLOADING_SA) { - // Waitting for system ability quit - usleep(RETRY_INTERVAL); - } - SetRequestServiceProxy(nullptr); - LoadRequestServer(); - auto proxy = GetRequestServiceProxy(); - if (proxy == nullptr) { - REQUEST_HILOGE("proxy is nullptr!"); - continue; - } - errorCode = proxy->Create(config, taskId, listener); - ++interval; - } - if (errorCode != E_OK && config.action == Action::DOWNLOAD) { - for (auto file : config.files) { - std::remove(file.uri.c_str()); - } - } - return errorCode; -} - -void RequestManager::SetRequestServiceProxy(sptr proxy) -{ - std::lock_guard lock(serviceProxyMutex_); - requestServiceProxy_ = proxy; -} - int32_t RequestManager::GetTask(const std::string &tid, const std::string &token, Config &config) { - REQUEST_HILOGD("GetTask in"); - auto proxy = GetRequestServiceProxy(); - if (proxy == nullptr) { - return E_SERVICE_ERROR; - } - - return proxy->GetTask(tid, token, config); + return RequestManagerImpl::GetInstance()->GetTask(tid, token, config); } - int32_t RequestManager::Start(const std::string &tid) { - REQUEST_HILOGD("Start in"); - auto proxy = GetRequestServiceProxy(); - if (proxy == nullptr) { - return E_SERVICE_ERROR; - } - - return proxy->Start(tid); + return RequestManagerImpl::GetInstance()->Start(tid); } - int32_t RequestManager::Stop(const std::string &tid) { - REQUEST_HILOGD("Stop in"); - auto proxy = GetRequestServiceProxy(); - if (proxy == nullptr) { - return E_SERVICE_ERROR; - } - - return proxy->Stop(tid); + return RequestManagerImpl::GetInstance()->Stop(tid); } int32_t RequestManager::Query(const std::string &tid, TaskInfo &info) { - REQUEST_HILOGD("Query in"); - auto proxy = GetRequestServiceProxy(); - if (proxy == nullptr) { - return E_SERVICE_ERROR; - } - - return proxy->Query(tid, info); + return RequestManagerImpl::GetInstance()->Query(tid, info); } int32_t RequestManager::Touch(const std::string &tid, const std::string &token, TaskInfo &info) { - REQUEST_HILOGD("Touch in"); - auto proxy = GetRequestServiceProxy(); - if (proxy == nullptr) { - return E_SERVICE_ERROR; - } - - return proxy->Touch(tid, token, info); + return RequestManagerImpl::GetInstance()->Touch(tid, token, info); } int32_t RequestManager::Search(const Filter &filter, std::vector &tids) { - REQUEST_HILOGD("Search in"); - auto proxy = GetRequestServiceProxy(); - if (proxy == nullptr) { - return E_SERVICE_ERROR; - } - - return proxy->Search(filter, tids); + return RequestManagerImpl::GetInstance()->Search(filter, tids); } int32_t RequestManager::Show(const std::string &tid, TaskInfo &info) { - REQUEST_HILOGD("Show in"); - auto proxy = GetRequestServiceProxy(); - if (proxy == nullptr) { - return E_SERVICE_ERROR; - } - - return proxy->Show(tid, info); + return RequestManagerImpl::GetInstance()->Show(tid, info); } int32_t RequestManager::Pause(const std::string &tid, Version version) { - REQUEST_HILOGD("Pause in"); - auto proxy = GetRequestServiceProxy(); - if (proxy == nullptr) { - return E_SERVICE_ERROR; - } - - return proxy->Pause(tid, version); + return RequestManagerImpl::GetInstance()->Pause(tid, version); } int32_t RequestManager::QueryMimeType(const std::string &tid, std::string &mimeType) { - REQUEST_HILOGD("QueryMimeType in"); - auto proxy = GetRequestServiceProxy(); - if (proxy == nullptr) { - return E_SERVICE_ERROR; - } - - return proxy->QueryMimeType(tid, mimeType); + return RequestManagerImpl::GetInstance()->QueryMimeType(tid, mimeType); } int32_t RequestManager::Remove(const std::string &tid, Version version) { - REQUEST_HILOGD("Remove in"); - auto proxy = GetRequestServiceProxy(); - if (proxy == nullptr) { - return E_SERVICE_ERROR; - } - - return proxy->Remove(tid, version); + return RequestManagerImpl::GetInstance()->Remove(tid, version); } int32_t RequestManager::Resume(const std::string &tid) { - REQUEST_HILOGD("Resume in"); - auto proxy = GetRequestServiceProxy(); - if (proxy == nullptr) { - return E_SERVICE_ERROR; - } - - return proxy->Resume(tid); + return RequestManagerImpl::GetInstance()->Resume(tid); } int32_t RequestManager::On( const std::string &type, const std::string &tid, const sptr &listener, Version version) { - REQUEST_HILOGD("On in"); - auto proxy = GetRequestServiceProxy(); - if (proxy == nullptr) { - return false; - } - - return proxy->On(type, tid, listener, version); + return RequestManagerImpl::GetInstance()->On(type, tid, listener, version); } int32_t RequestManager::Off(const std::string &type, const std::string &tid, Version version) { - REQUEST_HILOGD("Off in"); - auto proxy = GetRequestServiceProxy(); - if (proxy == nullptr) { - return false; - } - - return proxy->Off(type, tid, version); + return RequestManagerImpl::GetInstance()->Off(type, tid, version); } -sptr RequestManager::GetRequestServiceProxy() +int32_t RequestManager::Subscribe(const std::string &taskId, const std::shared_ptr &listener) { - std::lock_guard lock(serviceProxyMutex_); - if (requestServiceProxy_ != nullptr) { - return requestServiceProxy_; - } - sptr systemAbilityManager = - SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager(); - if (systemAbilityManager == nullptr) { - REQUEST_HILOGE("Getting SystemAbilityManager failed."); - return nullptr; - } - auto systemAbility = systemAbilityManager->GetSystemAbility(DOWNLOAD_SERVICE_ID, ""); - if (systemAbility == nullptr) { - REQUEST_HILOGE("Get SystemAbility failed."); - return nullptr; - } - if (!SubscribeSA(systemAbilityManager)) { - REQUEST_HILOGE("Subscribe SystemAbility failed."); - return nullptr; - } - deathRecipient_ = new RequestSaDeathRecipient(); - systemAbility->AddDeathRecipient(deathRecipient_); - requestServiceProxy_ = iface_cast(systemAbility); - if (requestServiceProxy_ == nullptr) { - REQUEST_HILOGE("Get requestServiceProxy_ fail."); - return nullptr; - } - return requestServiceProxy_; + return RequestManagerImpl::GetInstance()->Subscribe(taskId, listener); } -// Subscribe SA status changes only once -bool RequestManager::SubscribeSA(sptr systemAbilityManager) +int32_t RequestManager::Unsubscribe(const std::string &taskId, const std::shared_ptr &listener) { - if (saChangeListener_ != nullptr) { - return true; - } - saChangeListener_ = new (std::nothrow) SystemAbilityStatusChangeListener(); - if (saChangeListener_ == nullptr) { - REQUEST_HILOGE("Get saChangeListener_ failed."); - return false; - } - if (systemAbilityManager->SubscribeSystemAbility(DOWNLOAD_SERVICE_ID, saChangeListener_) != E_OK) { - REQUEST_HILOGE("SubscribeSystemAbility failed."); - return false; - } - return true; + return RequestManagerImpl::GetInstance()->Unsubscribe(taskId, listener); } void RequestManager::RestoreListener(void (*callback)()) { - callback_ = callback; -} - -RequestManager::SystemAbilityStatusChangeListener::SystemAbilityStatusChangeListener() -{ -} - -void RequestManager::SystemAbilityStatusChangeListener::OnAddSystemAbility(int32_t saId, const std::string &deviceId) -{ - if (saId != DOWNLOAD_SERVICE_ID) { - REQUEST_HILOGE("SA ID is not DOWNLOAD_SERVICE_ID."); - } - REQUEST_HILOGD("SystemAbility Add."); - if (RequestManager::GetInstance()->callback_ != nullptr) { - RequestManager::GetInstance()->callback_(); - } -} - -void RequestManager::SystemAbilityStatusChangeListener::OnRemoveSystemAbility(int32_t saId, const std::string &deviceId) -{ - if (saId != DOWNLOAD_SERVICE_ID) { - REQUEST_HILOGE("SA ID is not DOWNLOAD_SERVICE_ID."); - } - REQUEST_HILOGD("SystemAbility Remove."); -} - -void RequestManager::OnRemoteSaDied(const wptr &remote) -{ - REQUEST_HILOGD(" RequestManager::OnRemoteSaDied"); - ready_.store(false); - SetRequestServiceProxy(nullptr); -} - -RequestSaDeathRecipient::RequestSaDeathRecipient() -{ -} - -void RequestSaDeathRecipient::OnRemoteDied(const wptr &object) -{ - REQUEST_HILOGI("RequestSaDeathRecipient on remote systemAbility died."); - RequestManager::GetInstance()->OnRemoteSaDied(object); + return RequestManagerImpl::GetInstance()->RestoreListener(callback); } bool RequestManager::LoadRequestServer() { - REQUEST_HILOGD("Begin load request server"); - if (ready_.load()) { - REQUEST_HILOGD("GetSystemAbilityManager ready_ true"); - return true; - } - std::lock_guard lock(downloadMutex_); - if (ready_.load()) { - REQUEST_HILOGD("GetSystemAbilityManager ready_ is true"); - return true; - } - - auto sm = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager(); - if (sm == nullptr) { - REQUEST_HILOGE("GetSystemAbilityManager return null"); - return false; - } - auto systemAbility = sm->CheckSystemAbility(DOWNLOAD_SERVICE_ID); - if (systemAbility != nullptr) { - REQUEST_HILOGI("service already exists"); - return true; - } - sptr loadCallback_ = new (std::nothrow) RequestSyncLoadCallback(); - if (loadCallback_ == nullptr) { - REQUEST_HILOGE("new DownloadAbilityCallback fail"); - return false; - } - - int32_t result = sm->LoadSystemAbility(DOWNLOAD_SERVICE_ID, loadCallback_); - if (result != ERR_OK) { - REQUEST_HILOGE("LoadSystemAbility %{public}d failed, result: %{public}d", DOWNLOAD_SERVICE_ID, result); - return false; - } - - { - std::unique_lock conditionLock(conditionMutex_); - auto waitStatus = syncCon_.wait_for( - conditionLock, std::chrono::milliseconds(LOAD_SA_TIMEOUT_MS), [this]() { return ready_.load(); }); - if (!waitStatus) { - REQUEST_HILOGE("download server load sa timeout"); - return false; - } - } - return true; + return RequestManagerImpl::GetInstance()->LoadRequestServer(); } bool RequestManager::IsSaReady() { - return ready_.load(); + return RequestManagerImpl::GetInstance()->IsSaReady(); } -void RequestManager::LoadServerSuccess() +void RequestManager::ReopenChannel() { - std::unique_lock lock(conditionMutex_); - ready_.store(true); - syncCon_.notify_one(); - REQUEST_HILOGI("load download server success"); + return RequestManagerImpl::GetInstance()->ReopenChannel(); } -void RequestManager::LoadServerFail() -{ - ready_.store(false); - REQUEST_HILOGE("load download server fail"); -} -} // namespace OHOS::Request +} // namespace OHOS::Request \ No newline at end of file diff --git a/frameworks/native/src/request_manager_impl.cpp b/frameworks/native/src/request_manager_impl.cpp new file mode 100644 index 00000000..36d9b342 --- /dev/null +++ b/frameworks/native/src/request_manager_impl.cpp @@ -0,0 +1,512 @@ +/* + * Copyright (C) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "request_manager_impl.h" + +#include +#include + +#include "data_ability_predicates.h" +#include "errors.h" +#include "log.h" +#include "rdb_errno.h" +#include "rdb_helper.h" +#include "rdb_open_callback.h" +#include "rdb_predicates.h" +#include "rdb_store.h" +#include "request_manager.h" +#include "request_sync_load_callback.h" +#include "response_message_receiver.h" +#include "result_set.h" +#include "system_ability_definition.h" + +namespace OHOS::Request { +constexpr const int32_t RETRY_INTERVAL = 500 * 1000; +constexpr const int32_t RETRY_MAX_TIMES = 5; + +const std::unique_ptr &RequestManagerImpl::GetInstance() +{ + static std::unique_ptr instance(new RequestManagerImpl()); + return instance; +} + +int32_t RequestManagerImpl::Create(const Config &config, int32_t &tid, sptr listener) +{ + REQUEST_HILOGD("RequestManagerImpl Create start."); + + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + REQUEST_HILOGE("GetRequestServiceProxy fail."); + return E_SERVICE_ERROR; + } + int32_t ret = proxy->Create(config, tid, listener); + if (ret == E_UNLOADING_SA) { + REQUEST_HILOGE("Service ability is quitting"); + return Retry(tid, config, ret, listener); + } + REQUEST_HILOGD("RequestManagerImpl Create end."); + return ret; +} + +int32_t RequestManagerImpl::Retry( + int32_t &taskId, const Config &config, int32_t errorCode, sptr listener) +{ + REQUEST_HILOGD("Retry in"); + int32_t interval = 1; + while (errorCode == E_UNLOADING_SA && interval <= RETRY_MAX_TIMES) { + if (config.action == Action::DOWNLOAD) { + for (auto file : config.files) { + std::remove(file.uri.c_str()); + } + } + + if (errorCode == E_UNLOADING_SA) { + // Waitting for system ability quit + usleep(RETRY_INTERVAL); + } + SetRequestServiceProxy(nullptr); + LoadRequestServer(); + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + REQUEST_HILOGE("proxy is nullptr!"); + continue; + } + errorCode = proxy->Create(config, taskId, listener); + ++interval; + } + if (errorCode != E_OK && config.action == Action::DOWNLOAD) { + for (auto file : config.files) { + std::remove(file.uri.c_str()); + } + } + return errorCode; +} + +void RequestManagerImpl::SetRequestServiceProxy(sptr proxy) +{ + std::lock_guard lock(serviceProxyMutex_); + requestServiceProxy_ = proxy; +} + +int32_t RequestManagerImpl::GetTask(const std::string &tid, const std::string &token, Config &config) +{ + REQUEST_HILOGD("GetTask in"); + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + return E_SERVICE_ERROR; + } + + return proxy->GetTask(tid, token, config); +} + +int32_t RequestManagerImpl::Start(const std::string &tid) +{ + REQUEST_HILOGD("Start in"); + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + return E_SERVICE_ERROR; + } + + return proxy->Start(tid); +} + +int32_t RequestManagerImpl::Stop(const std::string &tid) +{ + REQUEST_HILOGD("Stop in"); + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + return E_SERVICE_ERROR; + } + + return proxy->Stop(tid); +} + +int32_t RequestManagerImpl::Query(const std::string &tid, TaskInfo &info) +{ + REQUEST_HILOGD("Query in"); + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + return E_SERVICE_ERROR; + } + + return proxy->Query(tid, info); +} + +int32_t RequestManagerImpl::Touch(const std::string &tid, const std::string &token, TaskInfo &info) +{ + REQUEST_HILOGD("Touch in"); + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + return E_SERVICE_ERROR; + } + + return proxy->Touch(tid, token, info); +} + +int32_t RequestManagerImpl::Search(const Filter &filter, std::vector &tids) +{ + REQUEST_HILOGD("Search in"); + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + return E_SERVICE_ERROR; + } + + return proxy->Search(filter, tids); +} + +int32_t RequestManagerImpl::Show(const std::string &tid, TaskInfo &info) +{ + REQUEST_HILOGD("Show in"); + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + return E_SERVICE_ERROR; + } + + return proxy->Show(tid, info); +} + +int32_t RequestManagerImpl::Pause(const std::string &tid, Version version) +{ + REQUEST_HILOGD("Pause in"); + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + return E_SERVICE_ERROR; + } + + return proxy->Pause(tid, version); +} + +int32_t RequestManagerImpl::QueryMimeType(const std::string &tid, std::string &mimeType) +{ + REQUEST_HILOGD("QueryMimeType in"); + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + return E_SERVICE_ERROR; + } + + return proxy->QueryMimeType(tid, mimeType); +} + +int32_t RequestManagerImpl::Remove(const std::string &tid, Version version) +{ + REQUEST_HILOGD("Remove in"); + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + return E_SERVICE_ERROR; + } + + return proxy->Remove(tid, version); +} + +int32_t RequestManagerImpl::Resume(const std::string &tid) +{ + REQUEST_HILOGD("Resume in"); + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + return E_SERVICE_ERROR; + } + + return proxy->Resume(tid); +} + +int32_t RequestManagerImpl::On( + const std::string &type, const std::string &tid, const sptr &listener, Version version) +{ + REQUEST_HILOGD("On in"); + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + return false; + } + + return proxy->On(type, tid, listener, version); +} + +int32_t RequestManagerImpl::Off(const std::string &type, const std::string &tid, Version version) +{ + REQUEST_HILOGD("Off in"); + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + return false; + } + + return proxy->Off(type, tid, version); +} + +int32_t RequestManagerImpl::Subscribe(const std::string &taskId, const std::shared_ptr &listener) +{ + REQUEST_HILOGD("Subscribe in"); + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + return false; + } + std::shared_ptr task = this->GetTask(taskId); + task->AddListener(listener); + if (task->IsEventSubscribed(Request::EVENT_RESPONSE)) { + return E_OK; + } + + this->EnsureChannelOpen(); + proxy->Subscribe(taskId, Request::EVENT_RESPONSE); + task->MarkEventSubscribed(Request::EVENT_RESPONSE, true); + return E_OK; +} + +int32_t RequestManagerImpl::Unsubscribe(const std::string &taskId, const std::shared_ptr &listener) +{ + REQUEST_HILOGD("Unsubscribe in"); + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + return false; + } + std::shared_ptr task = this->GetTask(taskId); + size_t size = task->RemoveListener(listener); + if (size != 0U) { + return E_OK; + } + + if (!task->IsEventSubscribed(Request::EVENT_RESPONSE)) { + return E_OK; + } + + proxy->Unsubscribe(taskId, Request::EVENT_RESPONSE); + task->MarkEventSubscribed(Request::EVENT_RESPONSE, false); + return E_OK; +} + +int32_t RequestManagerImpl::EnsureChannelOpen() +{ + if (msgReceiver_) { + return E_OK; + } + + auto proxy = GetRequestServiceProxy(); + if (proxy == nullptr) { + return false; + } + + int32_t sockFd = -1; + int32_t ret = proxy->OpenChannel(sockFd); + if (ret != E_OK) { + return ret; + } + msgReceiver_ = std::make_shared(this, sockFd); + msgReceiver_->BeginReceive(); + return E_OK; +} + +std::shared_ptr RequestManagerImpl::GetTask(const std::string &taskId) +{ + auto it = tasks_.find(taskId); + if (it != tasks_.end()) { + return it->second; + } + + auto retPair = this->tasks_.emplace(taskId, std::make_shared(Request(taskId))); + + if (retPair.second) { + return retPair.first->second; + } + REQUEST_HILOGE("Response Task create fail"); + return std::shared_ptr(); +} + +void RequestManagerImpl::OnChannelBroken() +{ + this->msgReceiver_.reset(); +} + +void RequestManagerImpl::OnResponseReceive(const std::shared_ptr &response) +{ + auto it = tasks_.find(response->taskId); + if (it == tasks_.end()) { + REQUEST_HILOGE("task not found"); + return; + } + + it->second->OnResponseReceive(response); +} + +sptr RequestManagerImpl::GetRequestServiceProxy() +{ + std::lock_guard lock(serviceProxyMutex_); + if (requestServiceProxy_ != nullptr) { + return requestServiceProxy_; + } + sptr systemAbilityManager = + SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager(); + if (systemAbilityManager == nullptr) { + REQUEST_HILOGE("Getting SystemAbilityManager failed."); + return nullptr; + } + auto systemAbility = systemAbilityManager->GetSystemAbility(DOWNLOAD_SERVICE_ID, ""); + if (systemAbility == nullptr) { + REQUEST_HILOGE("Get SystemAbility failed."); + return nullptr; + } + if (!SubscribeSA(systemAbilityManager)) { + REQUEST_HILOGE("Subscribe SystemAbility failed."); + return nullptr; + } + deathRecipient_ = new RequestSaDeathRecipient(); + systemAbility->AddDeathRecipient(deathRecipient_); + requestServiceProxy_ = iface_cast(systemAbility); + if (requestServiceProxy_ == nullptr) { + REQUEST_HILOGE("Get requestServiceProxy_ fail."); + return nullptr; + } + return requestServiceProxy_; +} + +// Subscribe SA status changes only once +bool RequestManagerImpl::SubscribeSA(sptr systemAbilityManager) +{ + if (saChangeListener_ != nullptr) { + return true; + } + saChangeListener_ = new (std::nothrow) SystemAbilityStatusChangeListener(); + if (saChangeListener_ == nullptr) { + REQUEST_HILOGE("Get saChangeListener_ failed."); + return false; + } + if (systemAbilityManager->SubscribeSystemAbility(DOWNLOAD_SERVICE_ID, saChangeListener_) != E_OK) { + REQUEST_HILOGE("SubscribeSystemAbility failed."); + return false; + } + return true; +} + +void RequestManagerImpl::RestoreListener(void (*callback)()) +{ + callback_ = callback; +} + +RequestManagerImpl::SystemAbilityStatusChangeListener::SystemAbilityStatusChangeListener() +{ +} + +void RequestManagerImpl::SystemAbilityStatusChangeListener::OnAddSystemAbility( + int32_t saId, const std::string &deviceId) +{ + if (saId != DOWNLOAD_SERVICE_ID) { + REQUEST_HILOGE("SA ID is not DOWNLOAD_SERVICE_ID."); + } + REQUEST_HILOGD("SystemAbility Add."); + if (RequestManagerImpl::GetInstance()->callback_ != nullptr) { + RequestManagerImpl::GetInstance()->callback_(); + } +} + +void RequestManagerImpl::SystemAbilityStatusChangeListener::OnRemoveSystemAbility( + int32_t saId, const std::string &deviceId) +{ + if (saId != DOWNLOAD_SERVICE_ID) { + REQUEST_HILOGE("SA ID is not DOWNLOAD_SERVICE_ID."); + } + REQUEST_HILOGD("SystemAbility Remove."); +} + +void RequestManagerImpl::OnRemoteSaDied(const wptr &remote) +{ + REQUEST_HILOGD(" RequestManagerImpl::OnRemoteSaDied"); + ready_.store(false); + SetRequestServiceProxy(nullptr); +} + +RequestSaDeathRecipient::RequestSaDeathRecipient() +{ +} + +void RequestSaDeathRecipient::OnRemoteDied(const wptr &object) +{ + REQUEST_HILOGI("RequestSaDeathRecipient on remote systemAbility died."); + RequestManagerImpl::GetInstance()->OnRemoteSaDied(object); +} + +bool RequestManagerImpl::LoadRequestServer() +{ + REQUEST_HILOGD("Begin load request server"); + if (ready_.load()) { + REQUEST_HILOGD("GetSystemAbilityManager ready_ true"); + return true; + } + std::lock_guard lock(downloadMutex_); + if (ready_.load()) { + REQUEST_HILOGD("GetSystemAbilityManager ready_ is true"); + return true; + } + + auto sm = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager(); + if (sm == nullptr) { + REQUEST_HILOGE("GetSystemAbilityManager return null"); + return false; + } + auto systemAbility = sm->CheckSystemAbility(DOWNLOAD_SERVICE_ID); + if (systemAbility != nullptr) { + REQUEST_HILOGI("service already exists"); + return true; + } + sptr loadCallback_ = new (std::nothrow) RequestSyncLoadCallback(); + if (loadCallback_ == nullptr) { + REQUEST_HILOGE("new DownloadAbilityCallback fail"); + return false; + } + + int32_t result = sm->LoadSystemAbility(DOWNLOAD_SERVICE_ID, loadCallback_); + if (result != E_OK) { + REQUEST_HILOGE("LoadSystemAbility %{public}d failed, result: %{public}d", DOWNLOAD_SERVICE_ID, result); + return false; + } + + { + std::unique_lock conditionLock(conditionMutex_); + auto waitStatus = syncCon_.wait_for( + conditionLock, std::chrono::milliseconds(LOAD_SA_TIMEOUT_MS), [this]() { return ready_.load(); }); + if (!waitStatus) { + REQUEST_HILOGE("download server load sa timeout"); + return false; + } + } + return true; +} + +bool RequestManagerImpl::IsSaReady() +{ + return ready_.load(); +} + +void RequestManagerImpl::LoadServerSuccess() +{ + std::unique_lock lock(conditionMutex_); + ready_.store(true); + syncCon_.notify_one(); + REQUEST_HILOGI("load download server success"); +} + +void RequestManagerImpl::LoadServerFail() +{ + ready_.store(false); + REQUEST_HILOGE("load download server fail"); +} + +void RequestManagerImpl::ReopenChannel() +{ + if (!msgReceiver_) { + return; + } + msgReceiver_->Shutdown(); + this->EnsureChannelOpen(); +} + +} // namespace OHOS::Request diff --git a/frameworks/native/src/request_service_proxy.cpp b/frameworks/native/src/request_service_proxy.cpp index a2c0c846..e5cd32fc 100644 --- a/frameworks/native/src/request_service_proxy.cpp +++ b/frameworks/native/src/request_service_proxy.cpp @@ -21,10 +21,10 @@ #include +#include "download_server_ipc_interface_code.h" #include "iremote_broker.h" #include "log.h" #include "parcel_helper.h" -#include "download_server_ipc_interface_code.h" namespace OHOS::Request { using namespace OHOS::HiviewDFX; @@ -370,4 +370,60 @@ int32_t RequestServiceProxy::Off(const std::string &type, const std::string &tid } return E_OK; } + +int32_t RequestServiceProxy::OpenChannel(int32_t &sockFd) +{ + REQUEST_HILOGD("OpenChannel"); + MessageParcel data, reply; + MessageOption option; + data.WriteInterfaceToken(GetDescriptor()); + int32_t ret = + Remote()->SendRequest(static_cast(RequestInterfaceCode::CMD_OPENCHANNEL), data, reply, option); + if (ret != ERR_NONE) { + REQUEST_HILOGE("send request ret code is %{public}d", ret); + return E_SERVICE_ERROR; + } + int32_t errCode = reply.ReadInt32(); + if (errCode != E_OK) { + return errCode; + } + sockFd = reply.ReadFileDescriptor(); + REQUEST_HILOGD("OpenChannel sockFd: %{public}d", sockFd); + return E_OK; +} + +int32_t RequestServiceProxy::Subscribe(const std::string &taskId, int32_t cbType) +{ + REQUEST_HILOGD("Subscribe"); + MessageParcel data, reply; + MessageOption option; + data.WriteInterfaceToken(GetDescriptor()); + data.WriteString(taskId); + data.WriteInt32(cbType); + int32_t ret = + Remote()->SendRequest(static_cast(RequestInterfaceCode::CMD_SUBSCRIBE), data, reply, option); + if (ret != ERR_NONE) { + REQUEST_HILOGE("send request ret code is %{public}d", ret); + return E_SERVICE_ERROR; + } + return E_OK; +} + +int32_t RequestServiceProxy::Unsubscribe(const std::string &taskId, int32_t cbType) +{ + REQUEST_HILOGD("Unsubscribe"); + MessageParcel data, reply; + MessageOption option; + data.WriteInterfaceToken(GetDescriptor()); + data.WriteString(taskId); + data.WriteInt32(cbType); + int32_t ret = + Remote()->SendRequest(static_cast(RequestInterfaceCode::CMD_UNSUBSCRIBE), data, reply, option); + if (ret != ERR_NONE) { + REQUEST_HILOGE("send request ret code is %{public}d", ret); + return E_SERVICE_ERROR; + } + return E_OK; +} + } // namespace OHOS::Request diff --git a/frameworks/native/src/request_sync_load_callback.cpp b/frameworks/native/src/request_sync_load_callback.cpp index 18b79724..579791b6 100644 --- a/frameworks/native/src/request_sync_load_callback.cpp +++ b/frameworks/native/src/request_sync_load_callback.cpp @@ -19,6 +19,7 @@ #include "isystem_ability_load_callback.h" #include "log.h" #include "request_manager.h" +#include "request_manager_impl.h" #include "system_ability_definition.h" namespace OHOS::Request { @@ -29,7 +30,7 @@ void RequestSyncLoadCallback::OnLoadSystemAbilitySuccess( REQUEST_HILOGE("start systemAbilityId is not download server"); return; } - RequestManager::GetInstance()->LoadServerSuccess(); + RequestManagerImpl::GetInstance()->LoadServerSuccess(); } void RequestSyncLoadCallback::OnLoadSystemAbilityFail(int32_t systemAbilityId) @@ -38,6 +39,6 @@ void RequestSyncLoadCallback::OnLoadSystemAbilityFail(int32_t systemAbilityId) REQUEST_HILOGE("start systemAbilityId is not download server"); return; } - RequestManager::GetInstance()->LoadServerFail(); + RequestManagerImpl::GetInstance()->LoadServerFail(); } } // namespace OHOS::Request \ No newline at end of file diff --git a/frameworks/native/src/response_message_receiver.cpp b/frameworks/native/src/response_message_receiver.cpp new file mode 100644 index 00000000..188d4414 --- /dev/null +++ b/frameworks/native/src/response_message_receiver.cpp @@ -0,0 +1,220 @@ +/* + * Copyright (c) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "response_message_receiver.h" + +#include +#include + +#include +#include +#include + +#include "log.h" + +namespace OHOS::Request { + +static constexpr int32_t INT32_SIZE = 4; +static constexpr int32_t INT16_SIZE = 2; + +std::shared_ptr serviceHandler_; + +// retval == 0 means success, < 0 means failed +static int32_t Int32FromParcel(int32_t &num, char *&parcel, int32_t &size) +{ + if (size < INT32_SIZE) { + REQUEST_HILOGE("message not complete"); + return -1; + } + num = *reinterpret_cast(parcel); + parcel += INT32_SIZE; + size -= INT32_SIZE; + return 0; +} + +static int16_t Int16FromParcel(int16_t &num, char *&parcel, int32_t &size) +{ + if (size < INT16_SIZE) { + REQUEST_HILOGE("message not complete"); + return -1; + } + num = *reinterpret_cast(parcel); + parcel += INT16_SIZE; + size -= INT16_SIZE; + return 0; +} + +static int32_t StringFromParcel(std::string &str, char *&parcel, int32_t &size) +{ + int32_t i = 0; + + while (i < size && parcel[i] != '\0') { + ++i; + } + + if (i < size) { + str.assign(parcel, i); + parcel += (i + 1); + size -= (i + 1); + return 0; + } else { + REQUEST_HILOGE("message not complete"); + return -1; + } +} + +static int32_t ResponseHeaderFromParcel( + std::map> &headers, char *&parcel, int32_t &size) +{ + std::string s(parcel, size); + std::stringstream ss(s); + std::string line; + while (std::getline(ss, line, '\n')) { + std::stringstream keyValue(line); + std::string key, valueLine; + std::getline(keyValue, key, ':'); + std::getline(keyValue, valueLine); + std::stringstream values(valueLine); + std::string value; + while (getline(values, value, ',')) { + headers[key].push_back(value); + } + } + return 0; +} + +ResponseMessageReceiver::ResponseMessageReceiver(IResponseMessageHandler *handler, int32_t sockFd) + : handler_(handler), sockFd_(sockFd) +{ +} + +void ResponseMessageReceiver::BeginReceive() +{ + std::shared_ptr runner = OHOS::AppExecFwk::EventRunner::GetMainEventRunner(); + serviceHandler_ = std::make_shared(runner); + serviceHandler_->AddFileDescriptorListener( + sockFd_, OHOS::AppExecFwk::FILE_DESCRIPTOR_INPUT_EVENT, shared_from_this(), "subscribe"); +} + +// ret 0 if success, ret < 0 if fail +static int32_t MsgHeaderParcel(int32_t &msgId, int16_t &msgType, int16_t &bodySize, char *&parcel, int32_t &size) +{ + int32_t magicNum = 0; + if (Int32FromParcel(magicNum, parcel, size) != 0) { + return -1; + } + if (magicNum != ResponseMessageReceiver::RESPONSE_MAGIC_NUM) { + REQUEST_HILOGE("Bad magic num, %{public}d", magicNum); + return -1; + } + + if (Int32FromParcel(msgId, parcel, size) != 0) { + return -1; + } + if (Int16FromParcel(msgType, parcel, size) != 0) { + return -1; + } + if (Int16FromParcel(bodySize, parcel, size) != 0) { + return -1; + } + return 0; +} + +static int32_t MsgFromParcel(std::shared_ptr &response, char *&parcel, int32_t &size) +{ + int32_t tid; + if (Int32FromParcel(tid, parcel, size) != 0) { + REQUEST_HILOGE("Bad tid"); + return -1; + } + response->taskId = std::to_string(tid); + + if (StringFromParcel(response->version, parcel, size) != 0) { + REQUEST_HILOGE("Bad version"); + return -1; + } + + if (Int32FromParcel(response->statusCode, parcel, size) != 0) { + REQUEST_HILOGE("Bad statusCode"); + return -1; + } + + if (StringFromParcel(response->reason, parcel, size) != 0) { + REQUEST_HILOGE("Bad reason"); + return -1; + } + + if (ResponseHeaderFromParcel(response->headers, parcel, size) != 0) { + REQUEST_HILOGE("Bad headers"); + return -1; + } + return 0; +} + +void ResponseMessageReceiver::OnReadable(int32_t fd) +{ + int32_t msgId; + int16_t msgType; + int16_t headerSize; + std::shared_ptr response = std::make_shared(); + int readSize = ResponseMessageReceiver::RESPONSE_MAX_SIZE; + char buffer[readSize]; + + int32_t length = read(fd, buffer, readSize); + if (length <= 0) { + return; + } + + REQUEST_HILOGD("read response: %{public}d", length); + + char *leftBuf = buffer; + int32_t leftLen = length; + MsgHeaderParcel(msgId, msgType, headerSize, leftBuf, leftLen); + if (msgId != messageId_) { + REQUEST_HILOGE("Bad messageId"); + return; + } + if (headerSize != length) { + REQUEST_HILOGE("Bad headerSize, %{public}d, %{public}d", length, headerSize); + } + ++messageId_; + + if (MsgFromParcel(response, leftBuf, leftLen) == 0) { + this->handler_->OnResponseReceive(response); + } +} + +void ResponseMessageReceiver::OnShutdown(int32_t fd) +{ + serviceHandler_->RemoveFileDescriptorListener(fd); + close(fd); + this->handler_->OnChannelBroken(); +} + +void ResponseMessageReceiver::OnException(int32_t fd) +{ + serviceHandler_->RemoveFileDescriptorListener(fd); + close(fd); + this->handler_->OnChannelBroken(); +} + +void ResponseMessageReceiver::Shutdown() +{ + serviceHandler_->RemoveFileDescriptorListener(sockFd_); + close(sockFd_); + this->handler_->OnChannelBroken(); +} + +} // namespace OHOS::Request \ No newline at end of file diff --git a/services/c_wrapper/include/application_state_observer.h b/services/c_wrapper/include/application_state_observer.h index e8667f2c..e3671407 100644 --- a/services/c_wrapper/include/application_state_observer.h +++ b/services/c_wrapper/include/application_state_observer.h @@ -28,7 +28,7 @@ namespace OHOS::Request { class ApplicationStateObserver { public: ~ApplicationStateObserver(); - using RegCallBack = std::function; + using RegCallBack = std::function; static ApplicationStateObserver &GetInstance(); bool RegisterAppStateChanged(RegCallBack &&callback); @@ -46,7 +46,7 @@ public: void OnProcessDied(const AppExecFwk::ProcessData &processData) override; public: - void RunCallback(int32_t uid, int32_t state); + void RunCallback(int32_t uid, int32_t state, int32_t pid); ApplicationStateObserver &appStateObserver_; }; ApplicationStateObserver(); @@ -58,7 +58,7 @@ public: extern "C" { #endif -typedef void (*APPStateCallback)(int32_t, int32_t); +typedef void (*APPStateCallback)(int32_t, int32_t, int32_t); void RegisterAPPStateCallback(APPStateCallback fun); #ifdef __cplusplus diff --git a/services/c_wrapper/source/application_state_observer.cpp b/services/c_wrapper/source/application_state_observer.cpp index 6e6480a0..12c77059 100644 --- a/services/c_wrapper/source/application_state_observer.cpp +++ b/services/c_wrapper/source/application_state_observer.cpp @@ -84,7 +84,7 @@ void ApplicationStateObserver::AppProcessState::OnAbilityStateChanged( { REQUEST_HILOGD("OnAbilityStateChanged uid=%{public}d, bundleName=%{public}s,state=%{public}d", abilityStateData.uid, abilityStateData.bundleName.c_str(), abilityStateData.abilityState); - RunCallback(abilityStateData.uid, abilityStateData.abilityState); + RunCallback(abilityStateData.uid, abilityStateData.abilityState, abilityStateData.pid); } void ApplicationStateObserver::AppProcessState::OnExtensionStateChanged( @@ -98,18 +98,18 @@ void ApplicationStateObserver::AppProcessState::OnProcessCreated(const AppExecFw void ApplicationStateObserver::AppProcessState::OnProcessDied(const AppExecFwk::ProcessData &processData) { - REQUEST_HILOGD("OnProcessDied uid=%{public}d, bundleName=%{public}s, state=%{public}d", processData.uid, - processData.bundleName.c_str(), static_cast(processData.state)); - RunCallback(processData.uid, static_cast(processData.state)); + REQUEST_HILOGD("OnProcessDied uid=%{public}d, bundleName=%{public}s, state=%{public}d, pid=%{public}d", + processData.uid, processData.bundleName.c_str(), static_cast(processData.state), processData.pid); + RunCallback(processData.uid, static_cast(processData.state), processData.pid); } -void ApplicationStateObserver::AppProcessState::RunCallback(int32_t uid, int32_t state) +void ApplicationStateObserver::AppProcessState::RunCallback(int32_t uid, int32_t state, int32_t pid) { if (appStateObserver_.callback_ == nullptr) { REQUEST_HILOGE("appStateObserver callback is nullptr"); return; } - appStateObserver_.callback_(uid, state); + appStateObserver_.callback_(uid, state, pid); } } // namespace OHOS::Request diff --git a/services/src/lib.rs b/services/src/lib.rs index 69cfd627..0df46730 100644 --- a/services/src/lib.rs +++ b/services/src/lib.rs @@ -30,12 +30,13 @@ extern crate log; mod hilog; mod error; -mod manager; +mod manage; mod task; mod utils; cfg_oh! { mod init; + mod notify; mod trace; mod sys_event; mod service; diff --git a/services/src/manager/events/construct.rs b/services/src/manage/events/construct.rs similarity index 99% rename from services/src/manager/events/construct.rs rename to services/src/manage/events/construct.rs index 730db187..c99f9814 100644 --- a/services/src/manager/events/construct.rs +++ b/services/src/manage/events/construct.rs @@ -17,7 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use crate::error::ErrorCode; -use crate::manager::{SystemProxyManager, TaskManager}; +use crate::manage::{SystemProxyManager, TaskManager}; use crate::task::config::{TaskConfig, Version}; use crate::task::ffi::{CTaskConfig, CTaskInfo}; use crate::task::info::State; diff --git a/services/src/manager/events/dump.rs b/services/src/manage/events/dump.rs similarity index 98% rename from services/src/manager/events/dump.rs rename to services/src/manage/events/dump.rs index fbed9a74..a5dc36ee 100644 --- a/services/src/manager/events/dump.rs +++ b/services/src/manage/events/dump.rs @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::manager::TaskManager; +use crate::manage::TaskManager; use crate::task::info::{DumpAllEachInfo, DumpAllInfo, DumpOneInfo}; impl TaskManager { diff --git a/services/src/manager/events/get_task.rs b/services/src/manage/events/get_task.rs similarity index 97% rename from services/src/manager/events/get_task.rs rename to services/src/manage/events/get_task.rs index 74d401e7..03dc7e6f 100644 --- a/services/src/manager/events/get_task.rs +++ b/services/src/manage/events/get_task.rs @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::manager::TaskManager; +use crate::manage::TaskManager; use crate::task::config::TaskConfig; impl TaskManager { diff --git a/services/src/manager/events/mod.rs b/services/src/manage/events/mod.rs similarity index 95% rename from services/src/manager/events/mod.rs rename to services/src/manage/events/mod.rs index 6053b2ea..e250d670 100644 --- a/services/src/manager/events/mod.rs +++ b/services/src/manage/events/mod.rs @@ -176,6 +176,14 @@ impl EventMessage { pub(crate) fn network_change() -> Self { Self::State(StateMessage::NetworkChange) } + + pub(crate) fn subscribe(task_id: u32, token_id: u64) -> (Self, Recv) { + let (tx, rx) = channel::(); + ( + Self::Task(TaskMessage::Subscribe(task_id, token_id, tx)), + Recv::new(rx), + ) + } } pub(crate) enum ServiceMessage { @@ -197,6 +205,7 @@ pub(crate) enum ServiceMessage { pub(crate) enum TaskMessage { Finished(u32), + Subscribe(u32, u64, Sender), } pub(crate) enum StateMessage { @@ -294,6 +303,11 @@ impl Debug for TaskMessage { .debug_struct("Finished") .field("task_id", task_id) .finish(), + Self::Subscribe(task_id, token_id, _) => f + .debug_struct("Subscribe") + .field("task_id", task_id) + .field("token_id", token_id) + .finish(), } } } diff --git a/services/src/manager/events/pause.rs b/services/src/manage/events/pause.rs similarity index 97% rename from services/src/manager/events/pause.rs rename to services/src/manage/events/pause.rs index b7fc4cec..4dc6840c 100644 --- a/services/src/manager/events/pause.rs +++ b/services/src/manage/events/pause.rs @@ -12,7 +12,7 @@ // limitations under the License. use crate::error::ErrorCode; -use crate::manager::TaskManager; +use crate::manage::TaskManager; use crate::task::reason::Reason; impl TaskManager { diff --git a/services/src/manager/events/query.rs b/services/src/manage/events/query.rs similarity index 98% rename from services/src/manager/events/query.rs rename to services/src/manage/events/query.rs index e474b540..a1b6b0cf 100644 --- a/services/src/manager/events/query.rs +++ b/services/src/manage/events/query.rs @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::manager::TaskManager; +use crate::manage::TaskManager; use crate::task::config::Action; use crate::task::ffi::{CTaskInfo, DeleteCTaskInfo}; use crate::task::info::TaskInfo; diff --git a/services/src/manager/events/query_mime_type.rs b/services/src/manage/events/query_mime_type.rs similarity index 98% rename from services/src/manager/events/query_mime_type.rs rename to services/src/manage/events/query_mime_type.rs index 9433e597..d2cab61b 100644 --- a/services/src/manager/events/query_mime_type.rs +++ b/services/src/manage/events/query_mime_type.rs @@ -12,7 +12,7 @@ // limitations under the License. use super::show::Show; -use crate::manager::TaskManager; +use crate::manage::TaskManager; use crate::task::ffi::DeleteCTaskInfo; use crate::task::info::TaskInfo; diff --git a/services/src/manager/events/remove.rs b/services/src/manage/events/remove.rs similarity index 98% rename from services/src/manager/events/remove.rs rename to services/src/manage/events/remove.rs index ba249b1c..21d2bcf2 100644 --- a/services/src/manager/events/remove.rs +++ b/services/src/manage/events/remove.rs @@ -12,7 +12,7 @@ // limitations under the License. use crate::error::ErrorCode; -use crate::manager::TaskManager; +use crate::manage::TaskManager; use crate::task::ffi::ChangeRequestTaskState; use crate::task::info::State; use crate::task::reason::Reason; diff --git a/services/src/manager/events/resume.rs b/services/src/manage/events/resume.rs similarity index 96% rename from services/src/manager/events/resume.rs rename to services/src/manage/events/resume.rs index 244b46cf..537e7851 100644 --- a/services/src/manager/events/resume.rs +++ b/services/src/manage/events/resume.rs @@ -14,11 +14,11 @@ use std::sync::atomic::Ordering; use crate::error::ErrorCode; -use crate::manager::TaskManager; +use crate::manage::TaskManager; use crate::task::info::State; cfg_oh! { - use crate::manager::Notifier; + use crate::manage::Notifier; } impl TaskManager { diff --git a/services/src/manager/events/search.rs b/services/src/manage/events/search.rs similarity index 97% rename from services/src/manager/events/search.rs rename to services/src/manage/events/search.rs index 334db221..6f83995d 100644 --- a/services/src/manager/events/search.rs +++ b/services/src/manage/events/search.rs @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::manager::TaskManager; +use crate::manage::TaskManager; use crate::utils::c_wrapper::{CFilter, CVectorWrapper, DeleteCVectorWrapper}; use crate::utils::filter::Filter; diff --git a/services/src/manager/events/show.rs b/services/src/manage/events/show.rs similarity index 98% rename from services/src/manager/events/show.rs rename to services/src/manage/events/show.rs index fb4a96b2..0ac2e3d9 100644 --- a/services/src/manager/events/show.rs +++ b/services/src/manage/events/show.rs @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::manager::TaskManager; +use crate::manage::TaskManager; use crate::task::ffi::{CTaskInfo, DeleteCTaskInfo}; use crate::task::info::TaskInfo; diff --git a/services/src/manager/events/start.rs b/services/src/manage/events/start.rs similarity index 94% rename from services/src/manager/events/start.rs rename to services/src/manage/events/start.rs index 144a6175..530feea8 100644 --- a/services/src/manager/events/start.rs +++ b/services/src/manage/events/start.rs @@ -15,8 +15,9 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use crate::error::ErrorCode; -use crate::manager::events::{EventMessage, TaskMessage}; -use crate::manager::TaskManager; +use crate::manage::events::{EventMessage, TaskMessage}; +use crate::manage::TaskManager; +use crate::service::ability::RequestAbility; use crate::task::info::{ApplicationState, State}; use crate::task::reason::Reason; use crate::task::request_task::run; @@ -88,6 +89,7 @@ impl TaskManager { ylong_runtime::spawn(async move { run(task.clone()).await; + RequestAbility::client_manager().notify_task_finished(task_id); tx.send(EventMessage::Task(TaskMessage::Finished( task.conf.common_data.task_id, ))) diff --git a/services/src/manager/events/stop.rs b/services/src/manage/events/stop.rs similarity index 98% rename from services/src/manager/events/stop.rs rename to services/src/manage/events/stop.rs index 3bcefb16..5b20df57 100644 --- a/services/src/manager/events/stop.rs +++ b/services/src/manage/events/stop.rs @@ -14,7 +14,7 @@ use std::sync::atomic::Ordering; use crate::error::ErrorCode; -use crate::manager::TaskManager; +use crate::manage::TaskManager; use crate::task::info::State; use crate::task::reason::Reason; diff --git a/services/src/manager/events/touch.rs b/services/src/manage/events/touch.rs similarity index 98% rename from services/src/manager/events/touch.rs rename to services/src/manage/events/touch.rs index e907a53c..911bc611 100644 --- a/services/src/manager/events/touch.rs +++ b/services/src/manage/events/touch.rs @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::manager::TaskManager; +use crate::manage::TaskManager; use crate::task::ffi::{CTaskInfo, DeleteCTaskInfo}; use crate::task::info::TaskInfo; use crate::utils::c_wrapper::CStringWrapper; diff --git a/services/src/manager/mod.rs b/services/src/manage/mod.rs similarity index 100% rename from services/src/manager/mod.rs rename to services/src/manage/mod.rs diff --git a/services/src/manager/monitor.rs b/services/src/manage/monitor.rs similarity index 99% rename from services/src/manager/monitor.rs rename to services/src/manage/monitor.rs index 6a1e82e9..5849e677 100644 --- a/services/src/manager/monitor.rs +++ b/services/src/manage/monitor.rs @@ -15,13 +15,13 @@ use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; use super::TaskManager; -use crate::manager::scheduled; +use crate::manage::scheduled; use crate::task::config::Action; use crate::task::info::{ApplicationState, Mode, State}; use crate::task::reason::Reason; cfg_oh! { - use crate::manager::Notifier; + use crate::manage::Notifier; } impl TaskManager { diff --git a/services/src/manager/notifier.rs b/services/src/manage/notifier.rs similarity index 97% rename from services/src/manager/notifier.rs rename to services/src/manage/notifier.rs index 31007d74..f0fdc7c5 100644 --- a/services/src/manager/notifier.rs +++ b/services/src/manage/notifier.rs @@ -14,8 +14,8 @@ use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; +use crate::notify::{Event, NotifyEvent}; use crate::service::ability::RequestAbility; -use crate::service::notify::{Event, NotifyEvent}; use crate::task::config::Version; use crate::task::info::ApplicationState; use crate::task::notify::NotifyData; diff --git a/services/src/manager/qos.rs b/services/src/manage/qos.rs similarity index 100% rename from services/src/manager/qos.rs rename to services/src/manage/qos.rs diff --git a/services/src/manager/scheduled.rs b/services/src/manage/scheduled.rs similarity index 100% rename from services/src/manager/scheduled.rs rename to services/src/manage/scheduled.rs diff --git a/services/src/manager/task_manager.rs b/services/src/manage/task_manager.rs similarity index 94% rename from services/src/manager/task_manager.rs rename to services/src/manage/task_manager.rs index 976f9e6c..c61e7432 100644 --- a/services/src/manager/task_manager.rs +++ b/services/src/manage/task_manager.rs @@ -24,6 +24,7 @@ use super::events::{ use super::qos::{Qos, QosChange, QosQueue}; use super::scheduled; use crate::error::ErrorCode; +use crate::service::ability::PANIC_INFO; use crate::task::config::Version; use crate::task::info::{ApplicationState, State}; use crate::task::reason::Reason; @@ -31,10 +32,8 @@ use crate::task::request_task::RequestTask; use crate::task::tick::Clock; use crate::utils::c_wrapper::CStringWrapper; -static mut TASKMANAGER_PANIC_INFO: Option = None; - cfg_oh! { - use crate::manager::Notifier; + use crate::manage::Notifier; } pub(crate) struct TaskManager { @@ -86,7 +85,7 @@ impl TaskManagerEntry { pub(crate) fn send_event(&self, event: EventMessage) -> bool { if self.tx.send(event).is_err() { unsafe { - if let Some(e) = TASKMANAGER_PANIC_INFO.as_ref() { + if let Some(e) = PANIC_INFO.as_ref() { error!("Sends TaskManager event failed {}", e); } else { info!("TaskManager is unloading") @@ -102,13 +101,6 @@ impl TaskManager { pub(crate) fn init() -> TaskManagerEntry { debug!("TaskManager init"); - std::panic::set_hook(Box::new(|info| { - error!("{}", info.to_string()); - unsafe { - TASKMANAGER_PANIC_INFO = Some(info.to_string()); - } - })); - ylong_runtime::builder::RuntimeBuilder::new_multi_thread() .worker_num(4) .build_global() @@ -187,6 +179,27 @@ impl TaskManager { }; self.after_task_processed(&task); } + TaskMessage::Subscribe(task_id, token_id, tx) => { + if let Some(task) = self.tasks.get(&task_id) { + if task.conf.common_data.token_id == token_id { + let _ = tx.send(ErrorCode::ErrOk); + } else { + let _ = tx.send(ErrorCode::Permission); + } + return; + } + for task in &self.restoring_tasks { + if task.conf.common_data.task_id == task_id { + if task.conf.common_data.token_id == token_id { + let _ = tx.send(ErrorCode::ErrOk); + } else { + let _ = tx.send(ErrorCode::Permission); + } + return; + } + } + let _ = tx.send(ErrorCode::TaskNotFound); + } } } diff --git a/services/src/manager/unload.rs b/services/src/manage/unload.rs similarity index 99% rename from services/src/manager/unload.rs rename to services/src/manage/unload.rs index 99509e05..50314f85 100644 --- a/services/src/manager/unload.rs +++ b/services/src/manage/unload.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use super::task_manager::GetTopBundleName; use super::TaskManager; -use crate::manager::monitor::IsOnline; +use crate::manage::monitor::IsOnline; use crate::task::config::{TaskConfig, Version}; use crate::task::ffi::{CTaskConfig, ChangeRequestTaskState}; use crate::task::info::{ApplicationState, State}; diff --git a/services/src/service/notify/manager.rs b/services/src/notify/manager.rs similarity index 99% rename from services/src/service/notify/manager.rs rename to services/src/notify/manager.rs index 74d67910..a11ebabf 100644 --- a/services/src/service/notify/manager.rs +++ b/services/src/notify/manager.rs @@ -18,7 +18,7 @@ use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedS use ylong_runtime::sync::oneshot::Sender; use crate::error::ErrorCode; -use crate::service::notify::{Event, NotifyData, NotifyEvent}; +use crate::notify::{Event, NotifyData, NotifyEvent}; use crate::service::RequestNotifyInterfaceCode; use crate::task::info::State; diff --git a/services/src/service/notify/mod.rs b/services/src/notify/mod.rs similarity index 100% rename from services/src/service/notify/mod.rs rename to services/src/notify/mod.rs diff --git a/services/src/service/ability.rs b/services/src/service/ability.rs index 26f18408..19e0657c 100644 --- a/services/src/service/ability.rs +++ b/services/src/service/ability.rs @@ -30,19 +30,23 @@ use std::hint; use std::mem::MaybeUninit; use std::sync::atomic::{AtomicU8, Ordering}; -use crate::manager::task_manager::TaskManagerEntry; -use crate::manager::TaskManager; +use crate::manage::task_manager::TaskManagerEntry; +use crate::manage::TaskManager; +use crate::notify::{NotifyEntry, NotifyManager}; +use crate::service::client::{ClientManager, ClientManagerEntry}; use crate::service::listener::{AppStateListener, NetworkChangeListener}; -use crate::service::notify::{NotifyEntry, NotifyManager}; static mut REQUEST_ABILITY: MaybeUninit = MaybeUninit::uninit(); static STATE: AtomicU8 = AtomicU8::new(RequestAbility::NOT_INITED); +pub(crate) static mut PANIC_INFO: Option = None; + pub(crate) struct RequestAbility { manager: TaskManagerEntry, notify: NotifyEntry, app: AppStateListener, network: NetworkChangeListener, + client_manager: ClientManagerEntry, } impl RequestAbility { @@ -63,6 +67,14 @@ impl RequestAbility { } pub(crate) fn init() { + std::panic::set_hook(Box::new(|info| unsafe { + let trace = std::backtrace::Backtrace::force_capture(); + let mut info = info.to_string(); + info.push_str(trace.to_string().as_str()); + error!("{}", info); + PANIC_INFO = Some(info); + })); + if STATE .compare_exchange( Self::NOT_INITED, @@ -78,6 +90,7 @@ impl RequestAbility { notify: NotifyManager::init(), app: AppStateListener::init(), network: NetworkChangeListener::init(), + client_manager: ClientManager::init(), }); RequestInitServiceHandler(); }; @@ -113,6 +126,10 @@ impl RequestAbility { pub(crate) fn task_manager() -> TaskManagerEntry { Self::get_instance().manager.clone() } + + pub(crate) fn client_manager() -> ClientManagerEntry { + Self::get_instance().client_manager.clone() + } } #[cfg(feature = "oh")] diff --git a/services/src/service/client/manager.rs b/services/src/service/client/manager.rs new file mode 100644 index 00000000..736de446 --- /dev/null +++ b/services/src/service/client/manager.rs @@ -0,0 +1,261 @@ +// Copyright (C) 2024 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::net::Shutdown; +use std::os::fd::AsRawFd; + +use ylong_http_client::Headers; +use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use ylong_runtime::sync::oneshot::Sender; + +use super::{Client, ClientEvent}; +use crate::error::ErrorCode; +use crate::service::ability::PANIC_INFO; + +const REQUEST_MAGIC_NUM: u32 = 0x43434646; +const HEADERS_MAX_SIZE: u16 = 8 * 1024; +const POSITION_OF_LENGTH: u32 = 10; + +pub(crate) enum MessageType { + HttpResponse = 0, +} + +#[derive(Clone)] +pub(crate) struct ClientManagerEntry { + tx: UnboundedSender, +} + +impl ClientManagerEntry { + fn new(tx: UnboundedSender) -> Self { + Self { tx } + } + + pub(crate) fn send_event(&self, event: ClientEvent) -> bool { + if self.tx.send(event).is_err() { + unsafe { + if let Some(e) = PANIC_INFO.as_ref() { + error!("Sends ClientManager event failed {}", e); + } else { + info!("ClientManager is unloading"); + } + } + return false; + } + true + } +} +pub(crate) struct ClientManager { + clients: HashMap, + pid_map: HashMap, + rx: UnboundedReceiver, +} + +impl ClientManager { + pub(crate) fn init() -> ClientManagerEntry { + debug!("ClientManager init"); + let (tx, rx) = unbounded_channel(); + let client_manager = ClientManager { + clients: HashMap::new(), + pid_map: HashMap::new(), + rx, + }; + ylong_runtime::spawn(client_manager.run()); + ClientManagerEntry::new(tx) + } + + async fn run(mut self) { + loop { + let recv = match self.rx.recv().await { + Ok(message) => message, + Err(e) => { + error!("ClientManager recv error {:?}", e); + continue; + } + }; + + match recv { + ClientEvent::OpenChannel(pid, uid, token_id, tx) => { + self.handle_open_channel(pid, uid, token_id, tx) + } + ClientEvent::Subscribe(tid, pid, uid, token_id, tx) => { + self.handle_subscribe(tid, pid, uid, token_id, tx) + } + ClientEvent::Unsubscribe(tid, tx) => self.handle_unsubscribe(tid, tx), + ClientEvent::TaskFinished(tid) => self.handle_task_finished(tid), + ClientEvent::Terminate(pid, tx) => self.handle_process_terminated(pid, tx), + ClientEvent::SendResponse(tid, version, status_code, reason, headers) => { + self.handle_send_response(tid, version, status_code, reason, headers) + .await + } + } + + debug!("ClientManager handle message done"); + } + } + + fn handle_open_channel( + &mut self, + pid: u64, + uid: u64, + token_id: u64, + tx: Sender>, + ) { + let client: &Client; + match self.clients.entry(pid) { + std::collections::hash_map::Entry::Occupied(o) => { + client = o.get(); + let _ = tx.send(Ok(client.client_sock_fd.as_raw_fd())); + } + std::collections::hash_map::Entry::Vacant(v) => { + match Client::constructor(pid, uid, token_id) { + Some(client) => { + let _ = tx.send(Ok(client.client_sock_fd.as_raw_fd())); + v.insert(client); + } + None => { + let _ = tx.send(Err(ErrorCode::Other)); + } + } + } + } + } + + fn handle_subscribe( + &mut self, + tid: u32, + pid: u64, + uid: u64, + token_id: u64, + tx: Sender, + ) { + if let Some(client) = self.clients.get_mut(&pid) { + let ret = client.handle_subscribe(tid, pid, uid, token_id); + if ret == ErrorCode::ErrOk { + self.pid_map.insert(tid, pid); + let _ = tx.send(ErrorCode::ErrOk); + return; + } + } else { + error!("channel not open"); + } + let _ = tx.send(ErrorCode::Other); + } + + fn handle_unsubscribe(&mut self, tid: u32, tx: Sender) { + if let Some(&pid) = self.pid_map.get(&tid) { + self.pid_map.remove(&tid); + if let Some(client) = self.clients.get_mut(&pid) { + client.handle_unsubscribe(tid); + let _ = tx.send(ErrorCode::ErrOk); + return; + } else { + debug!("client not found"); + } + } else { + debug!("unsubscribe tid not found"); + } + let _ = tx.send(ErrorCode::Other); + } + + fn handle_task_finished(&mut self, tid: u32) { + if let Some(&pid) = self.pid_map.get(&tid) { + self.pid_map.remove(&tid); + if let Some(client) = self.clients.get_mut(&pid) { + client.handle_unsubscribe(tid); + } else { + debug!("client not found"); + } + } else { + debug!("unsubscribe tid not found"); + } + } + + fn handle_process_terminated(&mut self, pid: u64, tx: Sender) { + if let Some(client) = self.clients.get_mut(&pid) { + let _ = client.client_sock_fd.shutdown(Shutdown::Both); + let _ = client.server_sock_fd.shutdown(Shutdown::Both); + debug!("client terminate, pid: {}", pid); + for (k, _v) in client.subscribed_map.iter() { + self.pid_map.remove(k); + } + self.clients.remove(&pid); + } else { + debug!("terminate pid not found"); + } + let _ = tx.send(ErrorCode::ErrOk); + } + + async fn handle_send_response( + &mut self, + tid: u32, + version: String, + status_code: u32, + reason: String, + headers: Headers, + ) { + if let Some(&pid) = self.pid_map.get(&tid) { + if let Some(client) = self.clients.get_mut(&pid) { + let mut response = Vec::::new(); + + response.extend_from_slice(&REQUEST_MAGIC_NUM.to_le_bytes()); + + response.extend_from_slice(&client.message_id.to_le_bytes()); + client.message_id += 1; + + let message_type = MessageType::HttpResponse as u16; + response.extend_from_slice(&message_type.to_le_bytes()); + + let message_body_size: u16 = 0; + response.extend_from_slice(&message_body_size.to_le_bytes()); + + response.extend_from_slice(&tid.to_le_bytes()); + + response.extend_from_slice(&version.into_bytes()); + response.push(b'\0'); + + response.extend_from_slice(&status_code.to_le_bytes()); + + response.extend_from_slice(&reason.into_bytes()); + response.push(b'\0'); + + for (k, v) in headers { + response.extend_from_slice(k.as_bytes()); + response.push(b':'); + for (i, sub_value) in v.iter().enumerate() { + if i != 0 { + response.push(b','); + } + response.extend_from_slice(sub_value); + } + response.push(b'\n'); + } + let mut size = response.len() as u16; + if size > HEADERS_MAX_SIZE { + response.truncate(HEADERS_MAX_SIZE as usize); + size = HEADERS_MAX_SIZE; + } + debug!("send response size, {:?}", size); + let size = size.to_le_bytes(); + response[POSITION_OF_LENGTH as usize] = size[0]; + response[(POSITION_OF_LENGTH + 1) as usize] = size[1]; + + client.send_response(tid, response).await; + } else { + debug!("response client not found"); + } + } else { + debug!("response pid not found"); + } + } +} diff --git a/services/src/service/client/mod.rs b/services/src/service/client/mod.rs new file mode 100644 index 00000000..d44bb64a --- /dev/null +++ b/services/src/service/client/mod.rs @@ -0,0 +1,179 @@ +// Copyright (C) 2024 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod manager; + +use std::collections::HashMap; + +pub(crate) use manager::{ClientManager, ClientManagerEntry}; +use ylong_http_client::Headers; +use ylong_runtime::net::UnixDatagram; +use ylong_runtime::sync::oneshot::{channel, Sender}; + +use crate::error::ErrorCode; +use crate::utils::Recv; + +pub(crate) enum ClientEvent { + OpenChannel(u64, u64, u64, Sender>), + Subscribe(u32, u64, u64, u64, Sender), + Unsubscribe(u32, Sender), + TaskFinished(u32), + Terminate(u64, Sender), + SendResponse(u32, String, u32, String, Headers), +} + +impl ClientManagerEntry { + pub(crate) fn open_channel(&self, pid: u64, uid: u64, token_id: u64) -> Result { + let (tx, rx) = channel::>(); + let event = ClientEvent::OpenChannel(pid, uid, token_id, tx); + if !self.send_event(event) { + return Err(ErrorCode::Other); + } + let rx = Recv::new(rx); + match rx.get() { + Some(ret) => ret, + None => { + error!("open_channel failed"); + Err(ErrorCode::Other) + } + } + } + + pub(crate) fn subscribe(&self, tid: u32, pid: u64, uid: u64, token_id: u64) -> ErrorCode { + let (tx, rx) = channel::(); + let event = ClientEvent::Subscribe(tid, pid, uid, token_id, tx); + if !self.send_event(event) { + return ErrorCode::Other; + } + let rx = Recv::new(rx); + match rx.get() { + Some(ret) => ret, + None => { + error!("subscribe failed"); + ErrorCode::Other + } + } + } + + pub(crate) fn unsubscribe(&self, tid: u32) -> ErrorCode { + let (tx, rx) = channel::(); + let event = ClientEvent::Unsubscribe(tid, tx); + if !self.send_event(event) { + return ErrorCode::Other; + } + let rx = Recv::new(rx); + match rx.get() { + Some(ret) => ret, + None => { + error!("unsubscribe failed"); + ErrorCode::Other + } + } + } + + pub(crate) fn notify_task_finished(&self, tid: u32) { + let event = ClientEvent::TaskFinished(tid); + self.send_event(event); + } + + pub(crate) fn notify_process_terminate(&self, pid: u64) -> ErrorCode { + let (tx, rx) = channel::(); + let event = ClientEvent::Terminate(pid, tx); + if !self.send_event(event) { + return ErrorCode::Other; + } + let rx = Recv::new(rx); + match rx.get() { + Some(ret) => ret, + None => { + error!("notify_process_terminate failed"); + ErrorCode::Other + } + } + } + + pub(crate) fn send_response( + &self, + tid: u32, + version: String, + status_code: u32, + reason: String, + headers: Headers, + ) { + let event = ClientEvent::SendResponse(tid, version, status_code, reason, headers); + let _ = self.send_event(event); + } +} + +// uid and token_id will be used later +#[allow(dead_code)] +pub(crate) struct Client { + pub(crate) pid: u64, + pub(crate) uid: u64, + pub(crate) token_id: u64, + pub(crate) message_id: u32, + pub(crate) subscribed_map: HashMap, + pub(crate) server_sock_fd: UnixDatagram, + pub(crate) client_sock_fd: UnixDatagram, +} + +impl Client { + pub(crate) fn constructor(pid: u64, uid: u64, token_id: u64) -> Option { + let (server_sock_fd, client_sock_fd) = match UnixDatagram::pair() { + Ok((server_sock_fd, client_sock_fd)) => (server_sock_fd, client_sock_fd), + Err(err) => { + error!("can't create a pair of sockets, {:?}", err); + return None; + } + }; + Some(Client { + pid, + uid, + token_id, + message_id: 1, + subscribed_map: HashMap::new(), + server_sock_fd, + client_sock_fd, + }) + } + + pub(crate) fn handle_subscribe( + &mut self, + tid: u32, + _pid: u64, + _uid: u64, + _token_id: u64, + ) -> ErrorCode { + if let Some(val) = self.subscribed_map.get_mut(&tid) { + *val = true; + } else { + self.subscribed_map.insert(tid, true); + } + + ErrorCode::ErrOk + } + + pub(crate) fn handle_unsubscribe(&mut self, tid: u32) { + if self.subscribed_map.remove(&tid).is_none() { + error!("tid: {} not subscribed", tid); + } + } + + pub(crate) async fn send_response(&mut self, tid: u32, response: Vec) { + if let Some(is_subscribed) = self.subscribed_map.get(&tid) { + if *is_subscribed { + let _ = self.server_sock_fd.send(&response).await; + } + } + } +} diff --git a/services/src/service/command/construct.rs b/services/src/service/command/construct.rs index 87f6e4d4..f11dd7ef 100644 --- a/services/src/service/command/construct.rs +++ b/services/src/service/command/construct.rs @@ -19,7 +19,7 @@ use ipc_rust::{ }; use crate::error::ErrorCode; -use crate::manager::events::EventMessage; +use crate::manage::events::EventMessage; use crate::service::ability::RequestAbility; use crate::service::permission::PermissionChecker; use crate::service::{get_calling_bundle, open_file_readonly, open_file_readwrite}; diff --git a/services/src/service/command/dump.rs b/services/src/service/command/dump.rs index 2ed81201..f10e2854 100644 --- a/services/src/service/command/dump.rs +++ b/services/src/service/command/dump.rs @@ -15,7 +15,7 @@ use std::io::Write; use ipc_rust::{FileDesc, IpcStatusCode, String16}; -use crate::manager::events::EventMessage; +use crate::manage::events::EventMessage; use crate::service::ability::RequestAbility; const HELP_MSG: &str = "usage:\n\ diff --git a/services/src/service/command/get_task.rs b/services/src/service/command/get_task.rs index 4f5cf9cd..c2c95a34 100644 --- a/services/src/service/command/get_task.rs +++ b/services/src/service/command/get_task.rs @@ -14,7 +14,7 @@ use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode}; use crate::error::ErrorCode; -use crate::manager::events::EventMessage; +use crate::manage::events::EventMessage; use crate::service::ability::RequestAbility; use crate::service::serialize_task_config; diff --git a/services/src/service/command/mod.rs b/services/src/service/command/mod.rs index b5f99f77..9e5c6667 100644 --- a/services/src/service/command/mod.rs +++ b/services/src/service/command/mod.rs @@ -16,6 +16,7 @@ mod dump; mod get_task; mod off; mod on; +mod open_channel; mod pause; mod query; mod query_mime_type; @@ -25,13 +26,16 @@ mod search; mod show; mod start; mod stop; +mod subscribe; mod touch; +mod unsubscribe; pub(crate) use construct::Construct; pub(crate) use dump::Dump; pub(crate) use get_task::GetTask; pub(crate) use off::Off; pub(crate) use on::On; +pub(crate) use open_channel::OpenChannel; pub(crate) use pause::Pause; pub(crate) use query::Query; pub(crate) use query_mime_type::QueryMimeType; @@ -41,4 +45,6 @@ pub(crate) use search::Search; pub(crate) use show::Show; pub(crate) use start::Start; pub(crate) use stop::Stop; +pub(crate) use subscribe::Subscribe; pub(crate) use touch::Touch; +pub(crate) use unsubscribe::Unsubscribe; diff --git a/services/src/service/command/off.rs b/services/src/service/command/off.rs index 81b4bb30..54faed64 100644 --- a/services/src/service/command/off.rs +++ b/services/src/service/command/off.rs @@ -14,8 +14,8 @@ use ipc_rust::{BorrowedMsgParcel, IpcResult, IpcStatusCode}; use crate::error::ErrorCode; +use crate::notify::{Event, NotifyEvent}; use crate::service::ability::RequestAbility; -use crate::service::notify::{Event, NotifyEvent}; use crate::service::permission::PermissionChecker; use crate::task::config::Version; diff --git a/services/src/service/command/on.rs b/services/src/service/command/on.rs index 8b642180..9b76991d 100644 --- a/services/src/service/command/on.rs +++ b/services/src/service/command/on.rs @@ -14,8 +14,8 @@ use ipc_rust::{BorrowedMsgParcel, IpcResult, IpcStatusCode, RemoteObj}; use crate::error::ErrorCode; +use crate::notify::{Event, NotifyEvent}; use crate::service::ability::RequestAbility; -use crate::service::notify::{Event, NotifyEvent}; use crate::service::permission::PermissionChecker; use crate::task::config::Version; diff --git a/services/src/service/command/open_channel.rs b/services/src/service/command/open_channel.rs new file mode 100644 index 00000000..4cfc5fa1 --- /dev/null +++ b/services/src/service/command/open_channel.rs @@ -0,0 +1,52 @@ +// Copyright (C) 2024 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fs::File; +use std::os::unix::io::FromRawFd; + +use ipc_rust::{ + get_calling_pid, get_calling_token_id, get_calling_uid, BorrowedMsgParcel, FileDesc, IpcResult, + IpcStatusCode, +}; + +use crate::error::ErrorCode; +use crate::service::ability::RequestAbility; + +pub(crate) struct OpenChannel; + +impl OpenChannel { + pub(crate) fn execute( + _data: &BorrowedMsgParcel, + reply: &mut BorrowedMsgParcel, + ) -> IpcResult<()> { + info!("open channnel"); + let pid = get_calling_pid(); + let uid = get_calling_uid(); + let token_id = get_calling_token_id(); + match RequestAbility::client_manager().open_channel(pid, uid, token_id) { + Ok(fd) => { + let file = unsafe { File::from_raw_fd(fd) }; + let file = FileDesc::new(file); + reply.write(&(ErrorCode::ErrOk as i32))?; + reply.write(&file)?; + info!("open channnel ok "); + Ok(()) + } + Err(_) => { + error!("open_channel failed"); + reply.write(&(ErrorCode::ParameterCheck as i32))?; + Err(IpcStatusCode::Failed) + } + } + } +} diff --git a/services/src/service/command/pause.rs b/services/src/service/command/pause.rs index f9549607..16d97d07 100644 --- a/services/src/service/command/pause.rs +++ b/services/src/service/command/pause.rs @@ -14,7 +14,7 @@ use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode}; use crate::error::ErrorCode; -use crate::manager::events::EventMessage; +use crate::manage::events::EventMessage; use crate::service::ability::RequestAbility; use crate::service::permission::PermissionChecker; use crate::task::config::Version; diff --git a/services/src/service/command/query.rs b/services/src/service/command/query.rs index 3d721fdd..a2f20082 100644 --- a/services/src/service/command/query.rs +++ b/services/src/service/command/query.rs @@ -14,7 +14,7 @@ use ipc_rust::{BorrowedMsgParcel, IpcResult, IpcStatusCode}; use crate::error::ErrorCode; -use crate::manager::events::EventMessage; +use crate::manage::events::EventMessage; use crate::service::ability::RequestAbility; use crate::service::permission::{PermissionChecker, QueryPermission}; use crate::service::{is_system_api, serialize_task_info}; diff --git a/services/src/service/command/query_mime_type.rs b/services/src/service/command/query_mime_type.rs index aee47e46..6bbcb96e 100644 --- a/services/src/service/command/query_mime_type.rs +++ b/services/src/service/command/query_mime_type.rs @@ -14,7 +14,7 @@ use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode}; use crate::error::ErrorCode; -use crate::manager::events::EventMessage; +use crate::manage::events::EventMessage; use crate::service::ability::RequestAbility; use crate::service::permission::PermissionChecker; diff --git a/services/src/service/command/remove.rs b/services/src/service/command/remove.rs index 5ba5326a..718ea235 100644 --- a/services/src/service/command/remove.rs +++ b/services/src/service/command/remove.rs @@ -14,7 +14,7 @@ use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode}; use crate::error::ErrorCode; -use crate::manager::events::EventMessage; +use crate::manage::events::EventMessage; use crate::service::ability::RequestAbility; use crate::service::permission::PermissionChecker; use crate::task::config::Version; diff --git a/services/src/service/command/resume.rs b/services/src/service/command/resume.rs index 2d5b8036..b97eacf6 100644 --- a/services/src/service/command/resume.rs +++ b/services/src/service/command/resume.rs @@ -14,7 +14,7 @@ use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode}; use crate::error::ErrorCode; -use crate::manager::events::EventMessage; +use crate::manage::events::EventMessage; use crate::service::ability::RequestAbility; use crate::service::permission::PermissionChecker; diff --git a/services/src/service/command/search.rs b/services/src/service/command/search.rs index 6261ed33..34c6f1d4 100644 --- a/services/src/service/command/search.rs +++ b/services/src/service/command/search.rs @@ -13,7 +13,7 @@ use ipc_rust::{BorrowedMsgParcel, IpcResult, IpcStatusCode}; -use crate::manager::events::EventMessage; +use crate::manage::events::EventMessage; use crate::service::ability::RequestAbility; use crate::service::{get_calling_bundle, is_system_api}; use crate::utils::filter::{CommonFilter, Filter}; diff --git a/services/src/service/command/show.rs b/services/src/service/command/show.rs index d4c995a1..672a9c40 100644 --- a/services/src/service/command/show.rs +++ b/services/src/service/command/show.rs @@ -14,7 +14,7 @@ use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode}; use crate::error::ErrorCode; -use crate::manager::events::EventMessage; +use crate::manage::events::EventMessage; use crate::service::ability::RequestAbility; use crate::service::permission::PermissionChecker; use crate::service::serialize_task_info; diff --git a/services/src/service/command/start.rs b/services/src/service/command/start.rs index a2fc8d0c..5c7ab3df 100644 --- a/services/src/service/command/start.rs +++ b/services/src/service/command/start.rs @@ -14,7 +14,7 @@ use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode}; use crate::error::ErrorCode; -use crate::manager::events::EventMessage; +use crate::manage::events::EventMessage; use crate::service::ability::RequestAbility; use crate::service::permission::PermissionChecker; diff --git a/services/src/service/command/stop.rs b/services/src/service/command/stop.rs index ee3b557f..fc9d6be9 100644 --- a/services/src/service/command/stop.rs +++ b/services/src/service/command/stop.rs @@ -14,7 +14,7 @@ use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode}; use crate::error::ErrorCode; -use crate::manager::events::EventMessage; +use crate::manage::events::EventMessage; use crate::service::ability::RequestAbility; pub(crate) struct Stop; diff --git a/services/src/service/command/subscribe.rs b/services/src/service/command/subscribe.rs new file mode 100644 index 00000000..9b085e24 --- /dev/null +++ b/services/src/service/command/subscribe.rs @@ -0,0 +1,78 @@ +// Copyright (C) 2024 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use ipc_rust::{ + get_calling_pid, get_calling_token_id, get_calling_uid, BorrowedMsgParcel, IpcResult, + IpcStatusCode, +}; + +use crate::error::ErrorCode; +use crate::manage::events::EventMessage; +use crate::service::ability::RequestAbility; + +pub(crate) struct Subscribe; + +impl Subscribe { + pub(crate) fn execute( + data: &BorrowedMsgParcel, + reply: &mut BorrowedMsgParcel, + ) -> IpcResult<()> { + info!("subscribe"); + let tid: String = data.read()?; + debug!("Service subscribe: task_id is {}", tid); + let pid = get_calling_pid(); + let uid = get_calling_uid(); + let token_id = get_calling_token_id(); + match tid.parse::() { + Ok(tid) => { + let (event, rx) = EventMessage::subscribe(tid, token_id); + if !RequestAbility::task_manager().send_event(event) { + reply.write(&(ErrorCode::Other as i32))?; + error!("send event failed"); + return Err(IpcStatusCode::Failed); + } + let ret = match rx.get() { + Some(ret) => ret, + None => { + error!("Service construct: receives ret failed"); + reply.write(&(ErrorCode::Other as i32))?; + return Err(IpcStatusCode::Failed); + } + }; + + if ret != ErrorCode::ErrOk { + error!("subscribe failed: {:?}", ret); + reply.write(&(ret as i32))?; + return Err(IpcStatusCode::Failed); + } + + if RequestAbility::client_manager().subscribe(tid, pid, uid, token_id) + == ErrorCode::ErrOk + { + reply.write(&(ErrorCode::ErrOk as i32))?; + info!("subscribe ok"); + Ok(()) + } else { + error!("subscribe failed"); + reply.write(&(ErrorCode::TaskNotFound as i32))?; + Err(IpcStatusCode::Failed) + } + } + _ => { + error!("Service subscribe: task_id not valid"); + reply.write(&(ErrorCode::TaskNotFound as i32))?; + Err(IpcStatusCode::Failed) + } + } + } +} diff --git a/services/src/service/command/touch.rs b/services/src/service/command/touch.rs index a20b10f9..beb0c2af 100644 --- a/services/src/service/command/touch.rs +++ b/services/src/service/command/touch.rs @@ -14,7 +14,7 @@ use ipc_rust::{get_calling_uid, BorrowedMsgParcel, IpcResult, IpcStatusCode}; use crate::error::ErrorCode; -use crate::manager::events::EventMessage; +use crate::manage::events::EventMessage; use crate::service::ability::RequestAbility; use crate::service::serialize_task_info; diff --git a/services/src/service/command/unsubscribe.rs b/services/src/service/command/unsubscribe.rs new file mode 100644 index 00000000..d7e65162 --- /dev/null +++ b/services/src/service/command/unsubscribe.rs @@ -0,0 +1,47 @@ +// Copyright (C) 2024 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use ipc_rust::{BorrowedMsgParcel, IpcResult, IpcStatusCode}; + +use crate::error::ErrorCode; +use crate::service::ability::RequestAbility; + +pub(crate) struct Unsubscribe; + +impl Unsubscribe { + pub(crate) fn execute( + data: &BorrowedMsgParcel, + reply: &mut BorrowedMsgParcel, + ) -> IpcResult<()> { + info!("subscribe"); + let tid: String = data.read()?; + debug!("Service unsubscribe: task_id is {}", tid); + match tid.parse::() { + Ok(tid) => { + if RequestAbility::client_manager().unsubscribe(tid) == ErrorCode::ErrOk { + reply.write(&(ErrorCode::ErrOk as i32))?; + Ok(()) + } else { + error!("unsubscribe failed"); + reply.write(&(ErrorCode::TaskNotFound as i32))?; // 错误码待统一处理 + Err(IpcStatusCode::Failed) + } + } + _ => { + error!("Service unsubscribe: task_id not valid"); + reply.write(&(ErrorCode::TaskNotFound as i32))?; + Err(IpcStatusCode::Failed) + } + } + } +} diff --git a/services/src/service/interface.rs b/services/src/service/interface.rs index 36a2c3a8..659e4bcf 100644 --- a/services/src/service/interface.rs +++ b/services/src/service/interface.rs @@ -44,6 +44,12 @@ pub(crate) enum RequestInterfaceCode { GetTask, /// system api deletes specifed tasks Clear, + /// open the channel for ipc + OpenChannel, + /// subscribe response + Subscribe, + /// unsubscribe response + Unsubscribe, } /// Function code of RequestNotifyInterfaceCode diff --git a/services/src/service/listener/app.rs b/services/src/service/listener/app.rs index acd8d791..947b1664 100644 --- a/services/src/service/listener/app.rs +++ b/services/src/service/listener/app.rs @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::manager::events::EventMessage; +use crate::manage::events::EventMessage; use crate::service::ability::RequestAbility; use crate::task::info::ApplicationState; @@ -33,13 +33,15 @@ impl AppStateListener { } } -extern "C" fn app_state_change_callback(uid: i32, state: i32) { +extern "C" fn app_state_change_callback(uid: i32, state: i32, pid: i32) { info!("Receives app state change callback"); - let state = match state { 2 => ApplicationState::Foreground, 4 => ApplicationState::Background, - 5 => ApplicationState::Terminated, + 5 => { + RequestAbility::client_manager().notify_process_terminate(pid as u64); + ApplicationState::Terminated + } _ => return, }; @@ -49,5 +51,5 @@ extern "C" fn app_state_change_callback(uid: i32, state: i32) { #[cfg(feature = "oh")] #[link(name = "request_service_c")] extern "C" { - fn RegisterAPPStateCallback(f: extern "C" fn(i32, i32)); + fn RegisterAPPStateCallback(f: extern "C" fn(i32, i32, i32)); } diff --git a/services/src/service/listener/network.rs b/services/src/service/listener/network.rs index e0cb0c43..559dc23a 100644 --- a/services/src/service/listener/network.rs +++ b/services/src/service/listener/network.rs @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::manager::events::EventMessage; +use crate::manage::events::EventMessage; use crate::service::ability::RequestAbility; pub(crate) struct NetworkChangeListener; diff --git a/services/src/service/mod.rs b/services/src/service/mod.rs index 4fdbc8d9..0e3b57f6 100644 --- a/services/src/service/mod.rs +++ b/services/src/service/mod.rs @@ -14,11 +14,11 @@ //! This crate implement the request server service. pub(crate) mod ability; +pub(crate) mod client; pub(crate) mod command; #[allow(unused)] pub(crate) mod interface; pub(crate) mod listener; -pub(crate) mod notify; pub(crate) mod permission; use std::fs::{File, OpenOptions}; @@ -72,6 +72,9 @@ fn on_remote_request( RequestInterfaceCode::Search => stub.search(data, reply), RequestInterfaceCode::GetTask => stub.get_task(data, reply), RequestInterfaceCode::Clear => Ok(()), + RequestInterfaceCode::OpenChannel => stub.open_channel(data, reply), + RequestInterfaceCode::Subscribe => stub.subscribe(data, reply), + RequestInterfaceCode::Unsubscribe => stub.unsubscribe(data, reply), } } @@ -95,6 +98,9 @@ impl TryFrom for RequestInterfaceCode { _ if code == Self::Search as u32 => Ok(Self::Search), _ if code == Self::GetTask as u32 => Ok(Self::GetTask), _ if code == Self::Clear as u32 => Ok(Self::Clear), + _ if code == Self::OpenChannel as u32 => Ok(Self::OpenChannel), + _ if code == Self::Subscribe as u32 => Ok(Self::Subscribe), + _ if code == Self::Unsubscribe as u32 => Ok(Self::Unsubscribe), _ => Err(IpcStatusCode::Failed), } } @@ -179,6 +185,33 @@ pub trait RequestServiceInterface: IRemoteBroker { fn get_task(&self, _data: &BorrowedMsgParcel, _reply: &mut BorrowedMsgParcel) -> IpcResult<()> { Ok(()) } + + /// open the channel for ipc + fn open_channel( + &self, + _data: &BorrowedMsgParcel, + _reply: &mut BorrowedMsgParcel, + ) -> IpcResult<()> { + Ok(()) + } + + /// subscribe response + fn subscribe( + &self, + _data: &BorrowedMsgParcel, + _reply: &mut BorrowedMsgParcel, + ) -> IpcResult<()> { + Ok(()) + } + + /// unsubscribe response + fn unsubscribe( + &self, + _data: &BorrowedMsgParcel, + _reply: &mut BorrowedMsgParcel, + ) -> IpcResult<()> { + Ok(()) + } } impl RequestServiceInterface for RequestServiceProxy {} @@ -252,6 +285,26 @@ impl RequestServiceInterface for RequestService { fn get_task(&self, data: &BorrowedMsgParcel, reply: &mut BorrowedMsgParcel) -> IpcResult<()> { command::GetTask::execute(data, reply) } + + fn open_channel( + &self, + data: &BorrowedMsgParcel, + reply: &mut BorrowedMsgParcel, + ) -> IpcResult<()> { + command::OpenChannel::execute(data, reply) + } + + fn subscribe(&self, data: &BorrowedMsgParcel, reply: &mut BorrowedMsgParcel) -> IpcResult<()> { + command::Subscribe::execute(data, reply) + } + + fn unsubscribe( + &self, + data: &BorrowedMsgParcel, + reply: &mut BorrowedMsgParcel, + ) -> IpcResult<()> { + command::Unsubscribe::execute(data, reply) + } } pub(crate) fn serialize_task_info(tf: TaskInfo, reply: &mut BorrowedMsgParcel) -> IpcResult<()> { diff --git a/services/src/task/download.rs b/services/src/task/download.rs index fb9145e3..2628f0ee 100644 --- a/services/src/task/download.rs +++ b/services/src/task/download.rs @@ -147,18 +147,15 @@ async fn download_inner(task: Arc) { return; } }; - task.record_response_header(&response); if !task.handle_response_error(&response).await { error!("response error"); return; } let response = response.unwrap(); - if !task.get_file_info(&response) { return; } - let mut downloader = build_downloader(task.clone(), response); let result = downloader.download().await; diff --git a/services/src/task/operator.rs b/services/src/task/operator.rs index 98cdb874..e24daa21 100644 --- a/services/src/task/operator.rs +++ b/services/src/task/operator.rs @@ -20,7 +20,7 @@ use ylong_http_client::HttpClientError; use ylong_runtime::io::AsyncWrite; #[cfg(feature = "oh")] -use crate::manager::Notifier; +use crate::manage::Notifier; use crate::task::config::Version; use crate::task::info::State; use crate::task::RequestTask; diff --git a/services/src/task/request_task.rs b/services/src/task/request_task.rs index 07748292..60313dfe 100644 --- a/services/src/task/request_task.rs +++ b/services/src/task/request_task.rs @@ -35,8 +35,9 @@ use super::info::{CommonTaskInfo, Mode, State, TaskInfo, UpdateInfo}; use super::notify::{EachFileStatus, NotifyData, Progress}; use super::reason::Reason; use super::upload::upload; -use crate::manager::monitor::IsOnline; -use crate::manager::SystemProxyManager; +use crate::manage::monitor::IsOnline; +use crate::manage::SystemProxyManager; +use crate::service::ability::RequestAbility; use crate::task::config::{Action, TaskConfig}; use crate::task::ffi::{ GetNetworkInfo, RequestBackgroundNotify, RequestTaskMsg, UpdateRequestTask, @@ -46,7 +47,7 @@ use crate::utils::{get_current_timestamp, hashmap_to_string}; cfg_oh! { use crate::service::{open_file_readonly, open_file_readwrite, convert_path}; - use crate::manager::Notifier; + use crate::manage::Notifier; } const SECONDS_IN_ONE_WEEK: u64 = 7 * 24 * 60 * 60; @@ -670,8 +671,31 @@ impl RequestTask { } } + pub(crate) fn notify_response(&self, response: &Response) { + let tid = self.conf.common_data.task_id; + let version: String = response.version().as_str().into(); + let status_code: u32 = response.status().as_u16() as u32; + let status_message: String; + if let Some(reason) = response.status().reason() { + status_message = reason.into(); + } else { + error!("bad status_message {:?}", status_code); + return; + } + let headers = response.headers().clone(); + debug!("notify_response"); + RequestAbility::client_manager().send_response( + tid, + version, + status_code, + status_message, + headers, + ) + } + pub(crate) fn record_response_header(&self, response: &Result) { if let Ok(r) = response { + self.notify_response(r); let mut guard = self.progress.lock().unwrap(); guard.extras.clear(); for (k, v) in r.headers() { diff --git a/test/unittest/js_test/requestAgentTaskTest/entry/src/main/ets/test/RequestOperateTask.test.ets b/test/unittest/js_test/requestAgentTaskTest/entry/src/main/ets/test/RequestOperateTask.test.ets index 32bf5cfd..301e6372 100644 --- a/test/unittest/js_test/requestAgentTaskTest/entry/src/main/ets/test/RequestOperateTask.test.ets +++ b/test/unittest/js_test/requestAgentTaskTest/entry/src/main/ets/test/RequestOperateTask.test.ets @@ -1283,5 +1283,152 @@ export default function requestOperateTaskTest() { }) await task.start(); }) + + /** + * @tc.number: testOnResponse001 + * @tc.name: testOnResponse001 + * @tc.desc: Test on response + * @tc.size: MediumTest + * @tc.type: Function + * @tc.level: Level 1 + * @tc.require: + */ + it('testOnResponse001', 0, async function (done) { + let conf: request.agent.Config = { + action: request.agent.Action.DOWNLOAD, + url: 'https://gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt', + saveas: `test.txt`, + mode: request.agent.Mode.FOREGROUND, + overwrite: true, + } + let task = await request.agent.create(context, conf); + task.on('response', (response: request.agent.HttpResponse) => { + expect(response.statusCode).assertEqual(200); + expect(response.version).assertEqual("HTTP/1.1"); + expect(response.reason).assertEqual("OK"); + done(); + }) + await task.start(); + }) + + /** + * @tc.number: testOnResponse002 + * @tc.name: testOnResponse002 + * @tc.desc: Test off response + * @tc.size: MediumTest + * @tc.type: Function + * @tc.level: Level 1 + * @tc.require: + */ + it('testOnResponse002', 0, async function (done) { + let conf = { + action: request.agent.Action.DOWNLOAD, + url: 'https://gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt', + saveas: `test.txt`, + mode: request.agent.Mode.FOREGROUND, + overwrite: true, + } + let callback = (response: request.agent.HttpResponse) => { + expect(false).assertTrue(); + done(); + }; + let task = await request.agent.create(context, conf); + task.on('response', callback) + task.off('response', callback); + task.on('completed', function (progress) { + expect(progress.state).assertEqual(request.agent.State.COMPLETED); + done(); + }) + await task.start(); + }) + + /** + * @tc.number: testOnResponse003 + * @tc.name: testOnResponse003 + * @tc.desc: Test off all response + * @tc.size: MediumTest + * @tc.type: Function + * @tc.level: Level 1 + * @tc.require: + */ + it('testOnResponse003', 0, async function (done) { + let conf = { + action: request.agent.Action.DOWNLOAD, + url: 'https://gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt', + saveas: `test.txt`, + mode: request.agent.Mode.FOREGROUND, + overwrite: true, + } + let task = await request.agent.create(context, conf); + task.on('response', function (response: request.agent.HttpResponse) { + expect(false).assertTrue(); + done(); + }) + task.on('response', function (response: request.agent.HttpResponse) { + expect(response.statusCode).assertEqual(200); + expect(false).assertTrue(); + done(); + }) + task.off('response'); + task.on('completed', function (progress) { + expect(progress.state).assertEqual(request.agent.State.COMPLETED); + done(); + }) + await task.start(); + }) + + /** + * @tc.number: testOnResponse004 + * @tc.name: testOnResponse004 + * @tc.desc: Test off with no response + * @tc.size: MediumTest + * @tc.type: Function + * @tc.level: Level 1 + * @tc.require: + */ + it('testOnResponse004', 0, async function (done) { + let conf = { + action: request.agent.Action.DOWNLOAD, + url: 'https://gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt', + saveas: `test.txt`, + mode: request.agent.Mode.FOREGROUND, + overwrite: true, + } + let task = await request.agent.create(context, conf); + task.off('response'); + task.on('completed', function (progress) { + expect(progress.state).assertEqual(request.agent.State.COMPLETED); + done(); + }) + await task.start(); + }) + + /** + * @tc.number: testOnResponse005 + * @tc.name: testOnResponse005 + * @tc.desc: Test on response with a bad request + * @tc.size: MediumTest + * @tc.type: Function + * @tc.level: Level 1 + * @tc.require: + */ + it('testOnResponse005', 0, async function (done) { + let conf = { + action: request.agent.Action.DOWNLOAD, + url: 'https://gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.jpg', + saveas: `test.txt`, + mode: request.agent.Mode.FOREGROUND, + overwrite: true, + } + let task = await request.agent.create(context, conf); + task.on('response', function (response: request.agent.HttpResponse) { + expect(response.statusCode).assertEqual(404); + expect(response.version).assertEqual("HTTP/1.1"); + expect(response.reason).assertEqual("Not Found"); + task.off('response'); + done(); + }) + await task.start(); + }) }) } \ No newline at end of file