From 0fb8e9430264a08724e71fa061b49dfd6501e742 Mon Sep 17 00:00:00 2001 From: fqwert Date: Mon, 22 Jul 2024 20:34:16 +0800 Subject: [PATCH] =?UTF-8?q?task=20=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E8=B0=83=E6=95=B4=EF=BC=8C=E6=B7=BB=E5=8A=A0=E6=B5=8B=E8=AF=95?= =?UTF-8?q?,=20=E5=A2=9E=E5=8A=A0no=5Foh=20=E4=B8=8B=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: fqwert Change-Id: I8b45d616a55bdaa90046f7a079e81ff56a442980 Signed-off-by: fqwert --- services/Cargo.toml | 29 +- services/include/network.h | 3 + services/src/cxx/c_request_database.cpp | 3 + services/src/cxx/network.cpp | 87 +-- services/src/lib.rs | 31 +- services/src/macros.rs | 44 ++ services/src/manage/account.rs | 1 - services/src/manage/app_state/listener.rs | 2 +- services/src/manage/config/cert_manager.rs | 2 +- services/src/manage/config/system_proxy.rs | 2 +- services/src/manage/database.rs | 7 +- services/src/manage/events/show.rs | 2 +- services/src/manage/mod.rs | 26 +- services/src/manage/network.rs | 128 +++- services/src/manage/scheduler/queue/mod.rs | 15 +- .../manage/scheduler/queue/running_task.rs | 186 +----- services/src/service/client/mod.rs | 1 - services/src/task/client.rs | 21 +- services/src/task/config.rs | 46 +- services/src/task/download.rs | 560 +++++++++++++++--- services/src/task/ffi.rs | 33 +- services/src/task/info.rs | 10 +- services/src/task/mod.rs | 10 +- services/src/task/notify.rs | 3 +- services/src/task/operator.rs | 89 ++- services/src/task/request_task.rs | 539 ++++++++--------- services/src/task/upload.rs | 308 ++++++---- services/src/utils/c_wrapper.rs | 12 +- services/src/utils/form_item.rs | 6 +- services/src/utils/mod.rs | 22 +- services/src/utils/url_policy.rs | 1 - services/tests/BUILD.gn | 5 +- services/tests/construct.rs | 1 + services/tests/entry.rs | 3 +- services/tests/resume.rs | 53 ++ services/tests/search.rs | 2 +- services/tests/start.rs | 10 +- test/rustest/BUILD.gn | 2 +- test/rustest/Cargo.toml | 19 +- test/rustest/src/lib.rs | 26 + 40 files changed, 1513 insertions(+), 837 deletions(-) create mode 100644 services/src/macros.rs create mode 100644 services/tests/resume.rs diff --git a/services/Cargo.toml b/services/Cargo.toml index e724613f..3db66b69 100644 --- a/services/Cargo.toml +++ b/services/Cargo.toml @@ -18,9 +18,16 @@ edition = "2021" license = "Apache-2.0" [features] -default = [] +default = ["oh"] -oh = [] +oh = [ + "hilog_rust", + "hisysevent", + "hitrace_meter_rust", + "samgr", + "ipc", + "system_ability_fwk", +] [dependencies] ylong_runtime = { git = "https://gitee.com/openharmony/commonlibrary_rust_ylong_runtime", features = ["full"] } @@ -31,14 +38,18 @@ ylong_http_client = { git = "https://gitee.com/openharmony/commonlibrary_rust_yl "ylong_base", ] } -hilog_rust = { git = "https://gitee.com/openharmony/hiviewdfx_hilog" } -hisysevent = { git = "https://gitee.com/openharmony/hiviewdfx_hisysevent" } -hitrace_meter_rust = { git = "https://gitee.com/openharmony/hiviewdfx_hitrace" } +hilog_rust = { git = "https://gitee.com/openharmony/hiviewdfx_hilog", optional = true } +hisysevent = { git = "https://gitee.com/openharmony/hiviewdfx_hisysevent", optional = true } +hitrace_meter_rust = { git = "https://gitee.com/openharmony/hiviewdfx_hitrace", optional = true } cxx = "1.0.115" -system_ability_fwk = { git = "https://gitee.com/openharmony/systemabilitymgr_safwk" } -samgr = { git = "https://gitee.com/openharmony/systemabilitymgr_samgr" } -ipc = { git = "https://gitee.com/openharmony/communication_ipc" } +system_ability_fwk = { git = "https://gitee.com/openharmony/systemabilitymgr_safwk", optional = true } +samgr = { git = "https://gitee.com/openharmony/systemabilitymgr_samgr", optional = true } +ipc = { git = "https://gitee.com/openharmony/communication_ipc", optional = true } +log = "0.4.22" +env_logger = "0.11.3" +mockall = { version = "0.12.1", features = ["nightly"] } +mockall_double = "0.3.1" [dev-dependencies] -test_common = { path = "../test/rustest" } +test_common = { path = "../test/rustest", features = ["oh"] } diff --git a/services/include/network.h b/services/include/network.h index 87400c4c..b40f9a1e 100644 --- a/services/include/network.h +++ b/services/include/network.h @@ -34,11 +34,14 @@ public: rust::fn notifyTaskManagerOnline, rust::fn notifyTaskManagerOffline); ~RequestNetCallbackStub(); + + int32_t NetAvailable(sptr &netHandle) override; int32_t NetLost(sptr &netHandle) override; int32_t NetUnavailable() override; int32_t NetCapabilitiesChange(sptr &netHandle, const sptr &netAllCap) override; private: + void HandleNetCap(const sptr &netAllCap); bool IsRoaming(); NetworkInner *networkNotifier_; NetworkTaskManagerTx *task_manager_; diff --git a/services/src/cxx/c_request_database.cpp b/services/src/cxx/c_request_database.cpp index ee6c7ac9..7d429c6d 100644 --- a/services/src/cxx/c_request_database.cpp +++ b/services/src/cxx/c_request_database.cpp @@ -16,10 +16,13 @@ #include "c_request_database.h" #include +#include #include #include +#include #include +#include #include "base/request/request/common/include/log.h" #include "cxx.h" diff --git a/services/src/cxx/network.cpp b/services/src/cxx/network.cpp index cf19c631..50a4eb2d 100644 --- a/services/src/cxx/network.cpp +++ b/services/src/cxx/network.cpp @@ -24,6 +24,7 @@ #include "net_conn_callback_stub.h" #include "net_conn_client.h" #include "net_specifier.h" +#include "refbase.h" #ifdef REQUEST_TELEPHONY_CORE_SERVICE #include "cellular_data_client.h" @@ -59,6 +60,58 @@ RequestNetCallbackStub::~RequestNetCallbackStub() rust::Box::from_raw(task_manager_); } +void RequestNetCallbackStub::HandleNetCap(const sptr &netAllCap) +{ + if (netAllCap->netCaps_.find(NetCap::NET_CAPABILITY_VALIDATED) == netAllCap->netCaps_.end()) { + networkNotifier_->notify_offline(); + notifyTaskManagerOffline_(*task_manager_); + return; + } + + for (auto bearerType : netAllCap->bearerTypes_) { + auto networkInfo = NetworkInfo(); + if (bearerType == NetManagerStandard::NetBearType::BEARER_WIFI) { + networkInfo.network_type = NetworkType::Wifi; + networkInfo.is_metered = false; + networkInfo.is_roaming = false; + + if (networkNotifier_->notify_online(networkInfo)) { + notifyTaskManagerOnline_(*task_manager_); + } + return; + } else if (bearerType == NetManagerStandard::NetBearType::BEARER_CELLULAR) { + networkInfo.network_type = NetworkType::Cellular; + networkInfo.is_metered = true; + networkInfo.is_roaming = this->IsRoaming(); + + if (networkNotifier_->notify_online(networkInfo)) { + notifyTaskManagerOnline_(*task_manager_); + } + return; + }; + } + if (networkNotifier_->notify_online(NetworkInfo{ + .network_type = NetworkType::Other, + .is_metered = false, + .is_roaming = false, + })) { + notifyTaskManagerOnline_(*task_manager_); + } + return; +} + +int32_t RequestNetCallbackStub::NetAvailable(sptr &netHandle) +{ + sptr netAllCap = sptr::MakeSptr(); + int32_t ret = NetConnClient::GetInstance().GetNetCapabilities(*netHandle, *netAllCap); + if (ret != 0) { + REQUEST_HILOGE("GetNetCapabilities failed, ret = %{public}d", ret); + return ret; + } + this->HandleNetCap(netAllCap); + return 0; +} + int32_t RequestNetCallbackStub::NetLost(sptr &netHandle) { networkNotifier_->notify_offline(); @@ -77,39 +130,7 @@ int32_t RequestNetCallbackStub::NetCapabilitiesChange( sptr &netHandle, const sptr &netAllCap) { REQUEST_HILOGI("NetCapabilitiesChange"); - if (netAllCap->netCaps_.find(NetCap::NET_CAPABILITY_VALIDATED) == netAllCap->netCaps_.end()) { - networkNotifier_->notify_offline(); - return 0; - } - - for (auto bearerType : netAllCap->bearerTypes_) { - if (bearerType == NetManagerStandard::NetBearType::BEARER_WIFI) { - if (networkNotifier_->notify_online(NetworkInfo{ - .network_type = NetworkType::Wifi, - .is_metered = false, - .is_roaming = false, - })) { - notifyTaskManagerOnline_(*task_manager_); - } - return 0; - } else if (bearerType == NetManagerStandard::NetBearType::BEARER_CELLULAR) { - if (networkNotifier_->notify_online(NetworkInfo{ - .network_type = NetworkType::Cellular, - .is_metered = true, - .is_roaming = this->IsRoaming(), - })) { - notifyTaskManagerOnline_(*task_manager_); - } - return 0; - }; - } - if (networkNotifier_->notify_online(NetworkInfo{ - .network_type = NetworkType::Other, - .is_metered = false, - .is_roaming = false, - })) { - notifyTaskManagerOnline_(*task_manager_); - } + this->HandleNetCap(netAllCap); return 0; } diff --git a/services/src/lib.rs b/services/src/lib.rs index 137024fd..9f8c279d 100644 --- a/services/src/lib.rs +++ b/services/src/lib.rs @@ -22,24 +22,32 @@ clippy::enum_variant_names, clippy::clone_on_copy )] -#[macro_use] -mod hilog; -pub mod ability; +#[macro_use] +mod macros; + +#[cfg(not(feature = "oh"))] +#[macro_use] +extern crate log; + +cfg_oh! { + #[macro_use] + mod hilog; + mod trace; + mod service; + pub mod ability; + mod sys_event; + pub use service::interface; + pub use utils::form_item::FileSpec; +} + mod error; mod manage; -mod service; -mod sys_event; mod task; - -#[cfg(feature = "oh")] -mod trace; mod utils; - -pub use service::interface; pub use task::{config, info}; -pub use utils::form_item::FileSpec; +cfg_oh! { #[cfg(not(test))] const LOG_LABEL: hilog_rust::HiLogLabel = hilog_rust::HiLogLabel { log_type: hilog_rust::LogType::LogCore, @@ -68,3 +76,4 @@ mod tests { fn SetAccessTokenPermission(); } } +} diff --git a/services/src/macros.rs b/services/src/macros.rs new file mode 100644 index 00000000..7baaa6d6 --- /dev/null +++ b/services/src/macros.rs @@ -0,0 +1,44 @@ +// Copyright (C) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +macro_rules! cfg_oh { + ($($item:item)*) => { + $( + #[cfg(feature = "oh")] + $item + )* + } +} + +macro_rules! cfg_not_oh { + ($($item:item)*) => { + $( + #[cfg(not(feature = "oh"))] + $item + )* + } +} + +#[cfg(not(feature = "oh"))] +macro_rules! cvt_res_error { + ($res: expr, $($args:tt)*) => {{ + match $res { + Ok(value) => value, + Err(e) => { + error!($($args)*); + error!("Error msg: {:?}", e); + return Err(e); + } + } + }} +} diff --git a/services/src/manage/account.rs b/services/src/manage/account.rs index 467e7377..a54617fb 100644 --- a/services/src/manage/account.rs +++ b/services/src/manage/account.rs @@ -295,7 +295,6 @@ impl RequestDb { #[cxx::bridge(namespace = "OHOS::Request")] mod ffi { - #[repr(i32)] enum OS_ACCOUNT_SUBSCRIBE_TYPE { INVALID_TYPE = -1, diff --git a/services/src/manage/app_state/listener.rs b/services/src/manage/app_state/listener.rs index 9e75c6dc..94462477 100644 --- a/services/src/manage/app_state/listener.rs +++ b/services/src/manage/app_state/listener.rs @@ -75,7 +75,7 @@ extern "C" fn process_state_change_callback(uid: i32, state: i32, pid: i32) { } } -#[link(name = "download_server_cxx", kind = "static")] +#[cfg(feature = "oh")] extern "C" { fn RegisterAPPStateCallback(f: extern "C" fn(i32, i32, i32)); fn RegisterProcessStateCallback(f: extern "C" fn(i32, i32, i32)); diff --git a/services/src/manage/config/cert_manager.rs b/services/src/manage/config/cert_manager.rs index 5ac48337..dbb03b62 100644 --- a/services/src/manage/config/cert_manager.rs +++ b/services/src/manage/config/cert_manager.rs @@ -96,7 +96,7 @@ fn update_system_cert(info: &Arc>) { }; } -#[link(name = "download_server_cxx", kind = "static")] +#[cfg(feature = "oh")] extern "C" { pub(crate) fn GetUserCertsData() -> *const CRequestCerts; pub(crate) fn FreeCertDataList(certs: *const CRequestCerts); diff --git a/services/src/manage/config/system_proxy.rs b/services/src/manage/config/system_proxy.rs index f1ad7265..2b6ae192 100644 --- a/services/src/manage/config/system_proxy.rs +++ b/services/src/manage/config/system_proxy.rs @@ -37,7 +37,7 @@ impl SystemProxyManager { } } -#[link(name = "download_server_cxx", kind = "static")] +#[cfg(feature = "oh")] extern "C" { pub(crate) fn RegisterProxySubscriber(); pub(crate) fn GetHost() -> CStringWrapper; diff --git a/services/src/manage/database.rs b/services/src/manage/database.rs index 3d798764..b33c022a 100644 --- a/services/src/manage/database.rs +++ b/services/src/manage/database.rs @@ -267,7 +267,8 @@ pub(crate) struct TaskQosInfo { pub(crate) priority: u32, } -#[link(name = "download_server_cxx", kind = "static")] +#[cfg(feature = "oh")] + extern "C" { fn DeleteCTaskConfig(ptr: *const CTaskConfig); fn DeleteCTaskInfo(ptr: *const CTaskInfo); @@ -415,6 +416,7 @@ mod test { use crate::config::Mode; use crate::task::info::State; use crate::tests::{test_init, DB_LOCK}; + use crate::utils::get_current_timestamp; use crate::utils::task_id_generator::TaskIdGenerator; #[test] @@ -473,8 +475,9 @@ mod test { test_init(); let _lock = DB_LOCK.lock().unwrap(); - let uid = 123456789; + let uid = get_current_timestamp() as u64; let mut db = RequestDb::get_instance(); + db.execute(&format!( "INSERT INTO request_task (task_id, uid, state, mode) VALUES ({}, {}, {}, {})", TaskIdGenerator::generate(), diff --git a/services/src/manage/events/show.rs b/services/src/manage/events/show.rs index cf4f4005..1d488737 100644 --- a/services/src/manage/events/show.rs +++ b/services/src/manage/events/show.rs @@ -21,7 +21,7 @@ impl TaskManager { match Database::get_instance().get_task_info(task_id) { Some(info) if info.uid() == uid => { - debug!("TaskManager Show: task info is {:?}", info); + info!("TaskManager Show: task info is {:?}", info); Some(info) } _ => { diff --git a/services/src/manage/mod.rs b/services/src/manage/mod.rs index e908d8c5..55ca1207 100644 --- a/services/src/manage/mod.rs +++ b/services/src/manage/mod.rs @@ -11,15 +11,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod account; -pub(crate) mod app_state; -pub(crate) mod config; -pub(crate) mod database; -pub(crate) mod events; -pub(crate) mod network; -pub(crate) mod notifier; -pub(crate) mod scheduler; -pub(crate) mod task_manager; -pub(crate) use config::{SystemConfig, SystemConfigManager}; +cfg_oh! { + pub(crate) mod account; + pub(crate) mod app_state; + pub(crate) mod config; + pub(crate) mod database; + pub(crate) mod events; + pub(crate) mod notifier; + pub(crate) mod scheduler; + pub(crate) mod task_manager; + pub(crate) use config::{SystemConfig, SystemConfigManager}; + pub(crate) use task_manager::TaskManager; +} + +mod network; + pub(crate) use network::Network; -pub(crate) use task_manager::TaskManager; diff --git a/services/src/manage/network.rs b/services/src/manage/network.rs index e53aaaa8..4cb798af 100644 --- a/services/src/manage/network.rs +++ b/services/src/manage/network.rs @@ -10,16 +10,23 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::{Arc, RwLock, RwLockReadGuard}; +use std::sync::{Arc, RwLock}; use cxx::UniquePtr; use ffi::NetworkRegistry; pub(crate) use ffi::{NetworkInfo, NetworkType}; use NetworkState::{Offline, Online}; -use super::database::RequestDb; -use super::events::TaskManagerEvent; -use super::task_manager::TaskManagerTx; +cfg_oh! { + use super::database::RequestDb; + use super::events::TaskManagerEvent; + use super::task_manager::TaskManagerTx; +} + +cfg_not_oh! { + use mockall::automock; +} + use crate::task::config::{NetworkConfig, TaskConfig}; use crate::task::info::State; use crate::task::reason::Reason; @@ -28,6 +35,7 @@ use crate::utils::get_current_timestamp; #[derive(Clone)] pub struct Network { inner: NetworkInner, + #[cfg(feature = "oh")] _registry: Arc>, } @@ -38,16 +46,25 @@ pub(crate) enum NetworkState { } impl Network { + #![allow(unused)] + pub(crate) fn new() -> Self { + Self { + inner: NetworkInner::new(), + #[cfg(feature = "oh")] + _registry: Arc::new(UniquePtr::null()), + } + } + pub(crate) fn is_online(&self) -> bool { matches!(*self.inner.state.read().unwrap(), Online(_)) } - pub(crate) fn state(&self) -> RwLockReadGuard { - self.inner.state.read().unwrap() + pub(crate) fn state(&self) -> NetworkState { + self.inner.state.read().unwrap().clone() } pub(crate) fn satisfied_state(&self, config: &TaskConfig) -> bool { - match &*self.state() { + match self.state() { // Handles in `RequestTask::network_online`. NetworkState::Offline => true, NetworkState::Online(info) => { @@ -87,8 +104,11 @@ impl Network { } } -pub(crate) fn register_network_change(task_manager: TaskManagerTx) -> Network { +pub(crate) fn register_network_change( + #[cfg(feature = "oh")] task_manager: TaskManagerTx, +) -> Network { let inner = NetworkInner::new(); + #[cfg(feature = "oh")] let registry = ffi::RegisterNetworkChange( Box::new(inner.clone()), Box::new(NetworkTaskManagerTx { @@ -105,6 +125,7 @@ pub(crate) fn register_network_change(task_manager: TaskManagerTx) -> Network { .send_event(TaskManagerEvent::network_offline()); }, ); + #[cfg(feature = "oh")] if registry.is_null() { error!("RegisterNetworkChange failed sleep 1s and retry"); #[cfg(not(test))] @@ -115,6 +136,7 @@ pub(crate) fn register_network_change(task_manager: TaskManagerTx) -> Network { } Network { inner, + #[cfg(feature = "oh")] _registry: Arc::new(registry), } } @@ -126,6 +148,7 @@ pub struct NetworkInner { } pub struct NetworkTaskManagerTx { + #[cfg(feature = "oh")] inner: TaskManagerTx, } @@ -144,6 +167,8 @@ impl NetworkInner { *self.last_notify_time.write().unwrap() = get_current_timestamp(); *state = Offline; + + #[cfg(feature = "oh")] self.update_database(NetworkState::Offline); } } @@ -158,6 +183,7 @@ impl NetworkInner { *state = Online(info.clone()); + #[cfg(feature = "oh")] self.update_database(Online(info)); } else { info!("Network change with the same: {:?}", info); @@ -165,6 +191,7 @@ impl NetworkInner { ret } + #[cfg(feature = "oh")] fn update_database(&self, state: NetworkState) { let mut database = RequestDb::get_instance(); match state { @@ -177,6 +204,7 @@ impl NetworkInner { } } +#[cfg(feature = "oh")] impl RequestDb { fn update_for_network_available(&mut self, info: &NetworkInfo) { let mut sql = format!( @@ -208,8 +236,9 @@ impl RequestDb { fn update_for_network_unavailable(&mut self, info: &NetworkInfo) { let mut sql = format!( - "UPDATE request_task SET state = {}, reason = {} WHERE ((state = {} AND reason = {} ) OR state = {} OR state = {})", + "UPDATE request_task SET state = {}, retry = {}, reason = {} WHERE ((state = {} AND reason = {} ) OR state = {} OR state = {})", State::Waiting.repr, + 1, Reason::UnsupportedNetworkType.repr, State::Waiting.repr, Reason::RunningTaskMeetLimits.repr, @@ -248,8 +277,9 @@ impl RequestDb { fn update_for_network_offline(&mut self) { let sql = format!( - "UPDATE request_task SET state = {}, reason = {} WHERE (state = {} AND reason = {} OR state = {} OR state = {})", + "UPDATE request_task SET state = {}, retry = {}, reason = {} WHERE (state = {} AND reason = {} OR state = {} OR state = {})", State::Waiting.repr, + 1, Reason::UnsupportedNetworkType.repr, State::Waiting.repr, Reason::RunningTaskMeetLimits.repr, @@ -304,6 +334,7 @@ mod ffi { } } +#[cfg(feature = "oh")] #[cfg(test)] mod test { @@ -338,7 +369,7 @@ mod test { assert!(network.is_online()); assert_eq!( - *network.state(), + network.state(), Online(NetworkInfo { network_type: NetworkType::Wifi, is_metered: false, @@ -357,7 +388,7 @@ mod test { assert!(network.is_online()); assert_eq!( - *network.state(), + network.state(), Online(NetworkInfo { network_type: NetworkType::Cellular, is_metered: true, @@ -374,7 +405,7 @@ mod test { let network = register_network_change(task_manager_tx); assert!(network.is_online()); assert_eq!( - *network.state(), + network.state(), Online(NetworkInfo { network_type: NetworkType::Wifi, is_metered: false, @@ -598,3 +629,74 @@ mod test { assert!(v.contains(&task_id)); } } + +#[cfg(not(feature = "oh"))] +#[cfg(test)] +mod test { + use core::{net, time}; + use std::fs::File; + use std::future::join; + use std::io::{Seek, SeekFrom, Write}; + use std::sync::Arc; + + use mockall_double::double; + use ylong_runtime::io::AsyncSeekExt; + + use super::NetworkType; + use crate::config::{Action, ConfigBuilder, Mode, TaskConfig}; + use crate::info::State; + use crate::manage::Network; + use crate::task::download; + use crate::task::download::download_inner; + use crate::task::reason::Reason; + use crate::task::request_task::{check_config, RequestTask, TaskError, TaskPhase}; + use crate::utils::form_item::FileSpec; + + const GITEE_FILE_LEN: u64 = 1042003; + const FS_FILE_LEN: u64 = 274619168; + + fn build_task(config: TaskConfig, network: Network) -> Arc { + let (files, client) = check_config(&config).unwrap(); + let task = Arc::new(RequestTask::new(config, files, client, network)); + task.status.lock().unwrap().state = State::Initialized; + task + } + + fn init() { + let _ = env_logger::builder().is_test(true).try_init(); + std::fs::create_dir("test_files/"); + } + #[test] + fn ut_network_status_error() { + init(); + let file_path = "test_files/ut_network_status_error.txt"; + + let file = File::create(file_path).unwrap(); + let config = ConfigBuilder::new() + .action(Action::Download) + .mode(Mode::BackGround) + .file_spec(file) + .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt") + .redirect(true) + .metered(false) + .retry(true) + .build(); + let network = Network::new(); + network.inner.notify_online(super::NetworkInfo { + network_type: NetworkType::Wifi, + is_metered: true, + is_roaming: false, + }); + + let task = build_task(config, network); + ylong_runtime::block_on(async { + let err = download_inner(task.clone()).await.unwrap_err(); + assert_eq!(task.status.lock().unwrap().state, State::Waiting); + assert_eq!( + task.status.lock().unwrap().reason, + Reason::UnsupportedNetworkType + ); + assert_eq!(err, TaskError::Waiting(TaskPhase::UserAbort)); + }); + } +} diff --git a/services/src/manage/scheduler/queue/mod.rs b/services/src/manage/scheduler/queue/mod.rs index ebe17903..ff6faf0b 100644 --- a/services/src/manage/scheduler/queue/mod.rs +++ b/services/src/manage/scheduler/queue/mod.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use keeper::SAKeeper; pub(crate) use notify_task::NotifyTask; +use ylong_runtime::sync::oneshot; use crate::ability::SYSTEM_CONFIG_MANAGER; use crate::error::ErrorCode; @@ -52,6 +53,7 @@ pub(crate) struct RunningQueue { runcount_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, network: Network, + locks: HashMap>, } impl RunningQueue { @@ -80,6 +82,7 @@ impl RunningQueue { runcount_manager, client_manager, network, + locks: HashMap::new(), } } @@ -169,6 +172,11 @@ impl RunningQueue { // If the task is not in the current running queue, retrieve // the corresponding task from the database and start it. let system_config = unsafe { SYSTEM_CONFIG_MANAGER.assume_init_ref().system_config() }; + + if let Some(recv) = self.locks.remove(&task_id) { + let _ = recv.await; + } + let task = match Database::get_instance() .get_task( task_id, @@ -201,14 +209,15 @@ impl RunningQueue { continue; } }; - + let (lock, rx) = oneshot::channel(); + self.locks.insert(task_id, rx); let keeper = self.keeper.clone(); let tx = self.tx.clone(); let runcount_manager = self.runcount_manager.clone(); task.speed_limit(qos_direction.direction() as u64); new_queue.insert((uid, task_id), task.clone()); - let task = RunningTask::new(runcount_manager, task, tx, keeper); - if !task.satisfied() { + let task = RunningTask::new(runcount_manager, task, tx, keeper, lock); + if !task.satisfied().await { continue; } self.join_handles.push(runtime_spawn(async move { diff --git a/services/src/manage/scheduler/queue/running_task.rs b/services/src/manage/scheduler/queue/running_task.rs index ba874969..a5c5dbf6 100644 --- a/services/src/manage/scheduler/queue/running_task.rs +++ b/services/src/manage/scheduler/queue/running_task.rs @@ -11,24 +11,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::SeekFrom; use std::ops::Deref; -use std::sync::atomic::Ordering; use std::sync::Arc; -use ylong_runtime::io::AsyncSeekExt; +use ylong_runtime::sync::oneshot; -use crate::manage::database::Database; use crate::manage::events::{TaskEvent, TaskManagerEvent}; -use crate::manage::notifier::Notifier; use crate::manage::scheduler::queue::keeper::SAKeeper; use crate::manage::scheduler::queue::notify_task::NotifyTask; use crate::manage::task_manager::TaskManagerTx; use crate::service::runcount::{RunCountEvent, RunCountManagerEntry}; use crate::task::config::Action; use crate::task::download::download; -use crate::task::info::{State, UpdateInfo}; -use crate::task::reason::Reason; use crate::task::request_task::RequestTask; use crate::task::upload::upload; @@ -36,6 +30,7 @@ pub(crate) struct RunningTask { runcount_manager: RunCountManagerEntry, task: NotifyTask, tx: TaskManagerTx, + lock: Option>, // `_keeper` is never used when executing the task. _keeper: SAKeeper, } @@ -46,183 +41,30 @@ impl RunningTask { task: Arc, tx: TaskManagerTx, keeper: SAKeeper, + lock: oneshot::Sender<()>, ) -> Self { // Task start to run, then running count +1. runcount_manager.send_event(RunCountEvent::change_runcount(1)); - { - let mut task_status = task.status.lock().unwrap(); - let from_state = task_status.state; - if from_state == State::Waiting - && (task_status.reason == Reason::NetworkOffline - || task_status.reason == Reason::UnsupportedNetworkType) - { - info!( - "Retry a waiting task with NetworkOffline/UnsupportedNetworkType, - uid:{}, task_id:{}", - task.conf.common_data.uid, task.conf.common_data.task_id - ); - task.retry.store(true, Ordering::SeqCst); - task.tries.fetch_add(1, Ordering::SeqCst); - let mut progress = task.progress.lock().unwrap(); - RequestTask::change_status( - &mut task_status, - &mut progress, - State::Retrying, - Reason::Default, - ); - task.set_code(progress.common_data.index, Reason::Default, false); - task.resume.store(true, Ordering::SeqCst); - let codes_guard = task.code.lock().unwrap(); - let update_info = UpdateInfo { - mtime: task_status.mtime, - reason: task_status.reason.repr, - progress: progress.clone(), - each_file_status: RequestTask::get_each_file_status_by_code( - &codes_guard, - &task.conf.file_specs, - ), - tries: task.tries.load(Ordering::SeqCst), - mime_type: task.mime_type.lock().unwrap().clone(), - }; - Database::get_instance().update_task(task.task_id(), update_info); - } else { - if from_state == State::Paused { - let notify_data = task.build_notify_data(); - Notifier::resume(&task.client_manager, notify_data); - } - let mut progress = task.progress.lock().unwrap(); - RequestTask::change_status( - &mut task_status, - &mut progress, - State::Running, - Reason::Default, - ); - task.set_code(progress.common_data.index, Reason::Default, false); - if from_state.check_resume() { - task.resume.store(true, Ordering::SeqCst); - } - let codes_guard = task.code.lock().unwrap(); - let update_info = UpdateInfo { - mtime: task_status.mtime, - reason: task_status.reason.repr, - progress: progress.clone(), - each_file_status: RequestTask::get_each_file_status_by_code( - &codes_guard, - &task.conf.file_specs, - ), - tries: task.tries.load(Ordering::SeqCst), - mime_type: task.mime_type.lock().unwrap().clone(), - }; - Database::get_instance().update_task(task.task_id(), update_info); - } - } Self { runcount_manager, task: NotifyTask::new(task), + lock: Some(lock), tx, _keeper: keeper, } } - pub(crate) async fn run(&self) { - let task = self; - let mut index = 0; - info!("run task: {}", task.conf.common_data.task_id); - - let action = task.conf.common_data.action; + pub(crate) async fn run(self) { + let action = self.conf.common_data.action; match action { - Action::Download => loop { - task.set_code(0, Reason::Default, true); - - download(task.task.clone()).await; - - let mut task_status = task.status.lock().unwrap(); - if !task_status.state.is_doing() { - break; - } - let mut progress = self.progress.lock().unwrap(); - let codes_guard = task.code.lock().unwrap(); - let reason = codes_guard.first(); - match reason { - Some(reason) => { - if *reason != Reason::Default { - RequestTask::change_status( - &mut task_status, - &mut progress, - State::Failed, - *reason, - ); - let update_info = UpdateInfo { - mtime: task_status.mtime, - reason: task_status.reason.repr, - progress: progress.clone(), - each_file_status: RequestTask::get_each_file_status_by_code( - &codes_guard, - &task.conf.file_specs, - ), - tries: task.tries.load(Ordering::SeqCst), - mime_type: task.mime_type.lock().unwrap().clone(), - }; - Database::get_instance().update_task(task.task_id(), update_info); - break; - } - } - None => break, - } - }, + Action::Download => { + download(self.task.clone()).await; + } Action::Upload => { - let state = task.status.lock().unwrap().state; - if state == State::Retrying { - index = { - let mut progress_guard = task.progress.lock().unwrap(); - let index = progress_guard.common_data.index; - progress_guard.common_data.total_processed -= - progress_guard.processed[index]; - progress_guard.processed[index] = 0; - index - }; - let file = task.files.get_mut(index).unwrap(); - let mut begins = task.conf.common_data.begins; - let (is_partial_upload, _) = task.get_upload_info(index); - if !is_partial_upload { - begins = 0; - } - if let Err(e) = file.seek(SeekFrom::Start(begins)).await { - task.set_code(index, Reason::IoError, false); - error!("seek err is {:?}", e); - } - } - upload(task.task.clone()).await; + upload(self.task.clone()).await; } _ => {} } - info!( - "task run end: {}, state: {:?}, reason: {:?}", - task.conf.common_data.task_id, - task.status.lock().unwrap().state, - task.code.lock().unwrap()[index] - ); - - let (state, reason) = { - let status = self.task.status.lock().unwrap(); - (status.state, status.reason) - }; - // Only tasks that cannot run automatically need to be removed from QoS - if state == State::Waiting && reason == Reason::RunningTaskMeetLimits { - return; - } - - // UserOperation tasks has been removed from qos in TaskManager - if reason == Reason::UserOperation { - return; - } - - let _ = self - .tx - .send_event(TaskManagerEvent::Task(TaskEvent::Finished( - self.task_id(), - self.uid(), - ))); } } @@ -236,6 +78,14 @@ impl Deref for RunningTask { impl Drop for RunningTask { fn drop(&mut self) { + info!("Task {} drop", self.task_id()); + + self.tx + .send_event(TaskManagerEvent::Task(TaskEvent::Finished( + self.task_id(), + self.uid(), + ))); + self.lock.take().unwrap().send(()).unwrap(); // Task finishes running, then running count -1. self.runcount_manager .send_event(RunCountEvent::change_runcount(-1)); diff --git a/services/src/service/client/mod.rs b/services/src/service/client/mod.rs index 656595ec..2cdca7ae 100644 --- a/services/src/service/client/mod.rs +++ b/services/src/service/client/mod.rs @@ -221,7 +221,6 @@ impl Client { .await; } } - debug!("Client handle message done"); } } diff --git a/services/src/task/client.rs b/services/src/task/client.rs index 482ed033..87f71589 100644 --- a/services/src/task/client.rs +++ b/services/src/task/client.rs @@ -18,18 +18,20 @@ use ylong_http_client::{ Certificate, HttpClientError, Proxy, PubKeyPins, Redirect, Timeout, TlsVersion, }; -use crate::manage::SystemConfig; +cfg_oh! { + use crate::manage::SystemConfig; + use crate::utils::url_policy::check_url_domain; +} use crate::task::config::{Action, TaskConfig}; use crate::task::files::{check_atomic_convert_path, convert_path}; use crate::task::ATOMIC_SERVICE; -use crate::utils::url_policy::check_url_domain; const CONNECT_TIMEOUT: u64 = 60; const SECONDS_IN_ONE_WEEK: u64 = 7 * 24 * 60 * 60; pub(crate) fn build_client( config: &TaskConfig, - mut system: SystemConfig, + #[cfg(feature = "oh")] mut system: SystemConfig, ) -> Result> { let mut client = Client::builder() .connect_timeout(Timeout::from_secs(CONNECT_TIMEOUT)) @@ -45,6 +47,7 @@ pub(crate) fn build_client( } // Set HTTP proxy. + #[cfg(feature = "oh")] if let Some(proxy) = build_task_proxy(config)? { client = client.proxy(proxy); } else if let Some(proxy) = build_system_proxy(&system)? { @@ -55,6 +58,7 @@ pub(crate) fn build_client( // redirected to HTTPS. // Set system certs. + #[cfg(feature = "oh")] if let Some(certs) = system.certs.take() { for cert in certs.into_iter() { client = client.add_root_certificate(cert) @@ -79,6 +83,7 @@ pub(crate) fn build_client( "ApiPolicy Domain check, task_id is {}, bundle is {}, domian_type is {}, url is {}", config.common_data.task_id, &config.bundle, &domain_type, &config.url ); + #[cfg(feature = "oh")] if let Some(is_accessed) = check_url_domain(&config.bundle, &domain_type, &config.url) { if !is_accessed { error!( @@ -96,8 +101,12 @@ pub(crate) fn build_client( ); } - let interceptors = DomainInterceptor::new(config.bundle.clone(), domain_type); - client = client.interceptor(interceptors); + #[cfg(feature = "oh")] + { + let interceptors = DomainInterceptor::new(config.bundle.clone(), domain_type); + client = client.interceptor(interceptors); + } + info!( "add interceptor domain check, tid: {}", config.common_data.task_id @@ -138,6 +147,7 @@ fn build_task_certificate_pins( ))) } +#[cfg(feature = "oh")] fn build_system_proxy( system: &SystemConfig, ) -> Result, Box> { @@ -207,6 +217,7 @@ impl DomainInterceptor { } } +#[cfg(feature = "oh")] impl Interceptor for DomainInterceptor { /// Intercepts the redirect request. fn intercept_redirect_request(&self, request: &Request) -> Result<(), HttpClientError> { diff --git a/services/src/task/config.rs b/services/src/task/config.rs index a9c93c51..8ed89e57 100644 --- a/services/src/task/config.rs +++ b/services/src/task/config.rs @@ -16,7 +16,10 @@ use std::fs::File; use std::os::fd::FromRawFd; pub use ffi::{Action, Mode}; -use ipc::parcel::Serialize; + +cfg_oh! { + use ipc::parcel::Serialize; +} use crate::utils::c_wrapper::{CFileSpec, CFormItem, CStringWrapper}; use crate::utils::form_item::{FileSpec, FormItem}; @@ -227,7 +230,7 @@ impl Default for TaskConfig { token: "xxx".to_string(), proxy: "".to_string(), extras: Default::default(), - version: Version::API9, + version: Version::API10, form_items: vec![], file_specs: vec![], body_file_paths: vec![], @@ -240,8 +243,8 @@ impl Default for TaskConfig { action: Action::Download, mode: Mode::BackGround, cover: false, - network_config: NetworkConfig::Wifi, - metered: true, + network_config: NetworkConfig::Any, + metered: false, roaming: false, retry: false, redirect: true, @@ -275,9 +278,15 @@ impl ConfigBuilder { self } + /// set version + pub fn version(&mut self, version: u8) -> &mut Self { + self.inner.version = version.into(); + self + } + /// Set title - pub fn file_spec)>(&mut self, op: F) -> &mut Self { - op(&mut self.inner.file_specs); + pub fn file_spec(&mut self, file: File) -> &mut Self { + self.inner.file_specs.push(FileSpec::user_file(file)); self } /// Set action @@ -332,8 +341,33 @@ impl ConfigBuilder { self.inner.common_data.redirect = redirect; self } + + /// begins + pub fn begins(&mut self, begins: u64) -> &mut Self { + self.inner.common_data.begins = begins; + self + } + + /// ends + pub fn ends(&mut self, ends: u64) -> &mut Self { + self.inner.common_data.ends = ends as i64; + self + } + + /// method + pub fn method(&mut self, metered: &str) -> &mut Self { + self.inner.method = metered.to_string(); + self + } + + /// retry + pub fn retry(&mut self, retry: bool) -> &mut Self { + self.inner.common_data.retry = retry; + self + } } +#[cfg(feature = "oh")] impl Serialize for TaskConfig { fn serialize(&self, parcel: &mut ipc::parcel::MsgParcel) -> ipc::IpcResult<()> { parcel.write(&(self.common_data.action.repr as u32))?; diff --git a/services/src/task/download.rs b/services/src/task/download.rs index 2d1302e9..6a59b060 100644 --- a/services/src/task/download.rs +++ b/services/src/task/download.rs @@ -11,16 +11,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io::SeekFrom; use std::pin::Pin; use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::{Context, Poll}; use ylong_http_client::async_impl::{DownloadOperator, Downloader, Response}; -use ylong_http_client::{HttpClientError, SpeedLimit, Timeout}; +use ylong_http_client::{ErrorKind, HttpClientError, SpeedLimit, Timeout}; +use ylong_runtime::io::AsyncSeekExt; use super::operator::TaskOperator; use super::reason::Reason; +use super::request_task::{TaskError, TaskPhase}; use crate::task::info::State; use crate::task::request_task::RequestTask; #[cfg(feature = "oh")] @@ -37,15 +40,6 @@ impl DownloadOperator for TaskOperator { cx: &mut Context<'_>, data: &[u8], ) -> Poll> { - if self.task.range_request.load(Ordering::SeqCst) { - if self.task.range_response.load(Ordering::SeqCst) { - return self.poll_write_file(cx, data, 0); - } - // write partial response data - let begins = self.task.conf.common_data.begins; - let ends = self.task.conf.common_data.ends; - return self.poll_write_partial_file(cx, data, begins, ends); - } self.poll_write_file(cx, data, 0) } @@ -74,86 +68,514 @@ pub(crate) fn build_downloader( } pub(crate) async fn download(task: Arc) { - download_inner(task.clone()).await; + loop { + if let Err(e) = download_inner(task.clone()).await { + match e { + TaskError::Waiting(phase) => match phase { + TaskPhase::NeedRetry => { + continue; + } + TaskPhase::UserAbort => { + let state = task.status.lock().unwrap().state; + if state == State::Running { + error!("task {} state is running with user abort", task.task_id()); + } + } + TaskPhase::NetworkOffline => {} + }, + TaskError::Failed(reason) => { + task.change_task_status(State::Failed, reason); + } + } + } + break; + } - use hisysevent::{build_number_param, build_str_param}; + #[cfg(feature = "oh")] + { + use hisysevent::{build_number_param, build_str_param}; - use crate::sys_event::SysEvent; - // `unwrap` for propagating panics among threads. + use crate::sys_event::SysEvent; + let reason = *task + .code + .lock() + .unwrap() + .first() + .unwrap_or(&Reason::Default); + // If `Reason` is not `Default`a records this sys event. - let reason = *task - .code - .lock() - .unwrap() - .first() - .unwrap_or(&Reason::Default); - // If `Reason` is not `Default`a records this sys event. - - if reason != Reason::Default { - SysEvent::task_fault() - .param(build_str_param!(crate::sys_event::TASKS_TYPE, "DOWNLOAD")) - .param(build_number_param!(crate::sys_event::TOTAL_FILE_NUM, 1)) - .param(build_number_param!(crate::sys_event::FAIL_FILE_NUM, 1)) - .param(build_number_param!(crate::sys_event::SUCCESS_FILE_NUM, 0)) - .param(build_number_param!( - crate::sys_event::ERROR_INFO, - reason.repr as i32 - )) - .write(); + if reason != Reason::Default { + SysEvent::task_fault() + .param(build_str_param!(crate::sys_event::TASKS_TYPE, "DOWNLOAD")) + .param(build_number_param!(crate::sys_event::TOTAL_FILE_NUM, 1)) + .param(build_number_param!(crate::sys_event::FAIL_FILE_NUM, 1)) + .param(build_number_param!(crate::sys_event::SUCCESS_FILE_NUM, 0)) + .param(build_number_param!( + crate::sys_event::ERROR_INFO, + reason.repr as i32 + )) + .write(); + } } } -async fn download_inner(task: Arc) { - debug!( - "begin download task, tid: {}", - task.conf.common_data.task_id - ); +impl RequestTask { + async fn prepare_download(&self) -> Result<(), TaskError> { + let file = self.files.get_mut(0).unwrap(); + file.seek(SeekFrom::End(0)); + let downloaded = file.metadata().await?.len() as usize; + let mut progress = self.progress.lock().unwrap(); + progress.common_data.index = 0; + progress.common_data.total_processed = downloaded; + progress.common_data.state = State::Running.repr; + progress.processed = vec![downloaded]; + progress.sizes = vec![-1]; + Ok(()) + } +} +pub(crate) async fn download_inner(task: Arc) -> Result<(), TaskError> { // Ensures `_trace` can only be freed when this function exits. #[cfg(feature = "oh")] let _trace = Trace::new("download file"); + task.prepare_running(); + task.prepare_download().await?; - let response = { - let request = match task.build_download_request().await { - Some(request) => request, - None => return, - }; + info!("download task {} start running", task.task_id()); - #[cfg(feature = "oh")] - { - let name = task.conf.file_specs[0].path.as_str(); - let download = task.progress.lock().unwrap().processed[0]; + task.range_response.store(false, Ordering::SeqCst); + task.range_request.store(false, Ordering::SeqCst); + let request = task.build_download_request().await?; - // Ensures `_trace` can only be freed when this function exits. - - let _trace = Trace::new(&format!( - "download file name: {name} downloaded size: {download}" - )); + let response = task.client.request(request).await; + match response.as_ref() { + Ok(response) => { + let status_code = response.status(); + #[cfg(feature = "oh")] + task.notify_response(response); + info!( + "task {} get http response code {}", + task.conf.common_data.task_id, status_code, + ); + if status_code.is_server_error() + || (status_code.as_u16() != 408 && status_code.is_client_error()) + || status_code.is_redirection() + { + return Err(TaskError::Failed(Reason::ProtocolError)); + } + if status_code.as_u16() == 408 { + if task.tries.load(Ordering::SeqCst) < 2 { + task.tries.fetch_add(1, Ordering::SeqCst); + info!("task {} server timeout", task.task_id()); + return Err(TaskError::Waiting(TaskPhase::NeedRetry)); + } else { + info!("task {} retry 3 times", task.task_id()); + return Err(TaskError::Failed(Reason::ProtocolError)); + } + } + if status_code.as_u16() == 200 { + if task.require_range() { + info!("task {} server not support range request", task.task_id()); + return Err(TaskError::Failed(Reason::UnsupportedRangeRequest)); + } + let file = task.files.get(0).unwrap(); + let has_downloaded = file.metadata().await?.len() > 0; + if has_downloaded { + error!("task {} file not cleared", task.task_id()); + task.clear_downloaded_file().await?; + } + } } + Err(e) => { + error!("Task {} {:?}", task.task_id(), e); - task.client.request(request).await + match e.error_kind() { + ErrorKind::Timeout => return Err(TaskError::Failed(Reason::ContinuousTaskTimeout)), + ErrorKind::Request => return Err(TaskError::Failed(Reason::RequestError)), + ErrorKind::Redirect => return Err(TaskError::Failed(Reason::RedirectError)), + ErrorKind::Connect | ErrorKind::ConnectionUpgrade => { + if task.tries.load(Ordering::SeqCst) < 2 { + task.tries.fetch_add(1, Ordering::SeqCst); + return Err(TaskError::Waiting(TaskPhase::NeedRetry)); + } + info!("task {} retry 3 times", task.task_id()); + if e.is_dns_error() { + return Err(TaskError::Failed(Reason::Dns)); + } else if e.is_tls_error() { + return Err(TaskError::Failed(Reason::Ssl)); + } else { + return Err(TaskError::Failed(Reason::Tcp)); + } + } + ErrorKind::BodyTransfer => return task.handle_body_transfer_error().await, + _ => { + if format!("{}", e).contains("No space left on device") { + return Err(TaskError::Failed(Reason::InsufficientSpace)); + } else { + return Err(TaskError::Failed(Reason::OthersError)); + } + } + }; + } }; - task.record_response_header(&response); - if !task.handle_response_error(&response).await { - error!("response error"); - return; - } + let response = response.unwrap(); - if !task.get_file_info(&response) { - return; + { + let mut guard = task.progress.lock().unwrap(); + guard.extras.clear(); + for (k, v) in response.headers() { + if let Ok(value) = v.to_string() { + guard.extras.insert(k.to_string().to_lowercase(), value); + } + } } + task.get_file_info(&response)?; + let mut downloader = build_downloader(task.clone(), response); - let result = downloader.download().await; - - if !task.handle_download_error(&result).await { - error!("handle_download_error"); - return; + if let Err(e) = downloader.download().await { + return task.handle_download_error(e).await; } + let file = task.files.get_mut(0).unwrap(); + file.sync_all().await?; - // Makes sure all the data has been written to the target file. - if let Some(file) = task.files.get(0) { - let _ = file.sync_all().await; - } task.change_task_status(State::Completed, Reason::Default); + info!("task {} download success", task.task_id()); + Ok(()) +} + +#[cfg(not(feature = "oh"))] +#[cfg(test)] +mod test { + use core::time; + use std::fs::File; + use std::io::{Seek, SeekFrom, Write}; + use std::sync::Arc; + + use ylong_runtime::io::AsyncSeekExt; + + use crate::config::{Action, ConfigBuilder, Mode, TaskConfig}; + use crate::info::State; + use crate::manage::Network; + use crate::task::download::{download_inner, TaskPhase}; + use crate::task::reason::Reason; + use crate::task::request_task::{check_config, RequestTask, TaskError}; + + const GITEE_FILE_LEN: u64 = 1042003; + const FS_FILE_LEN: u64 = 274619168; + + fn build_task(config: TaskConfig) -> Arc { + let (files, client) = check_config(&config).unwrap(); + let network = Network::new(); + let task = Arc::new(RequestTask::new(config, files, client, network)); + task.status.lock().unwrap().state = State::Initialized; + task + } + + fn init() { + let _ = env_logger::builder().is_test(true).try_init(); + let _ = std::fs::create_dir("test_files/"); + } + + #[test] + fn ut_download_basic() { + init(); + let file_path = "test_files/ut_download_basic.txt"; + + let file = File::create(file_path).unwrap(); + let config = ConfigBuilder::new() + .action(Action::Download) + .mode(Mode::BackGround) + .file_spec(file) + .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt") + .redirect(true) + .build(); + + let task = build_task(config); + ylong_runtime::block_on(async { + download_inner(task).await.unwrap(); + let file = File::open(file_path).unwrap(); + assert_eq!(GITEE_FILE_LEN, file.metadata().unwrap().len()); + }); + } + + #[test] + fn ut_download_resume() { + init(); + let file_path = "test_files/ut_download_resume.txt"; + + let mut file = File::create(file_path).unwrap(); + file.write(&[0; GITEE_FILE_LEN as usize - 10000]).unwrap(); + + let config = ConfigBuilder::new() + .action(Action::Download) + .mode(Mode::BackGround) + .file_spec(file) + .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt") + .redirect(true) + .build(); + let task = build_task(config); + ylong_runtime::block_on(async { + download_inner(task).await.unwrap(); + let file = File::open(file_path).unwrap(); + assert_eq!(GITEE_FILE_LEN, file.metadata().unwrap().len()); + }); + } + + #[test] + fn ut_download_not_support_range() { + init(); + let file_path = "test_files/ut_download_not_support_range.txt"; + + let file = File::create(file_path).unwrap(); + + let config = ConfigBuilder::new() + .action(Action::Download) + .mode(Mode::BackGround) + .file_spec(file) + .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt") + .redirect(true) + .begins(5000) + .build(); + let task = build_task(config); + ylong_runtime::block_on(async { + let res = download_inner(task).await.unwrap_err(); + assert_eq!(res, TaskError::Failed(Reason::UnsupportedRangeRequest)); + let file = File::open(file_path).unwrap(); + assert_eq!(0, file.metadata().unwrap().len()); + }); + } + + #[test] + fn ut_download_resume_not_support_range() { + init(); + let file_path = "test_files/ut_download_resume_not_support_range.txt"; + + let file = File::create(file_path).unwrap(); + + let config = ConfigBuilder::new() + .action(Action::Download) + .mode(Mode::BackGround) + .file_spec(file) + .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt") + .redirect(true) + .build(); + let task = build_task(config); + ylong_runtime::block_on(async { + let clone_task = task.clone(); + ylong_runtime::spawn(async move { + ylong_runtime::time::sleep(time::Duration::from_secs(2)).await; + clone_task.status.lock().unwrap().state = State::Waiting; + }); + let err = download_inner(task.clone()).await.unwrap_err(); + assert_eq!(err, TaskError::Waiting(TaskPhase::UserAbort)); + + let file = task.files.get_mut(0).unwrap(); + file.set_len(10000).await.unwrap(); + file.seek(SeekFrom::End(0)); + + download_inner(task.clone()).await.unwrap(); + let file = File::open(file_path).unwrap(); + assert_eq!(GITEE_FILE_LEN, file.metadata().unwrap().len()); + }); + } + + #[test] + fn ut_download_not_support_range_resume() { + init(); + let file_path = "test_files/ut_download_not_support_range_resume.txt"; + + let mut file = File::create(file_path).unwrap(); + file.write(&[0; 1000]).unwrap(); + + let config = ConfigBuilder::new() + .action(Action::Download) + .mode(Mode::BackGround) + .file_spec(file) + .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt") + .redirect(true) + .begins(5000) + .build(); + let task = build_task(config); + ylong_runtime::block_on(async { + let res = download_inner(task).await.unwrap_err(); + assert_eq!(res, TaskError::Failed(Reason::UnsupportedRangeRequest)); + let file = File::open(file_path).unwrap(); + assert_eq!(1000, file.metadata().unwrap().len()); + }); + } + + #[test] + fn ut_download_range_0() { + init(); + let file_path = "test_files/ut_download_range_0.txt"; + let file = File::create(file_path).unwrap(); + let config = ConfigBuilder::new() + .action(Action::Download) + .mode(Mode::BackGround) + .file_spec(file) + .url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe") + .redirect(true) + .begins(5000) + .ends(10000) + .build(); + let task = build_task(config); + ylong_runtime::block_on(async { + download_inner(task).await.unwrap(); + let file = File::open(file_path).unwrap(); + assert_eq!(5001, file.metadata().unwrap().len()); + }); + } + + #[test] + fn ut_download_range_1() { + init(); + let file_path = "test_files/ut_download_range_1.txt"; + + let file = File::create(file_path).unwrap(); + let config = ConfigBuilder::new() + .action(Action::Download) + .mode(Mode::BackGround) + .file_spec(file) + .url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe") + .redirect(true) + .begins(273619168) + .build(); + let task = build_task(config); + ylong_runtime::block_on(async { + download_inner(task).await.unwrap(); + let file = File::open(file_path).unwrap(); + assert_eq!(FS_FILE_LEN - 273619168, file.metadata().unwrap().len()); + }); + } + + #[test] + fn ut_download_range_resume_0() { + init(); + let file_path = "test_files/ut_download_range_resume_0.txt"; + + let mut file = File::create(file_path).unwrap(); + file.write(&[0; FS_FILE_LEN as usize - 10000]).unwrap(); + let config = ConfigBuilder::new() + .action(Action::Download) + .mode(Mode::BackGround) + .file_spec(file) + .url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe") + .redirect(true) + .build(); + let task = build_task(config); + ylong_runtime::block_on(async { + download_inner(task).await.unwrap(); + let file = File::open(file_path).unwrap(); + assert_eq!(FS_FILE_LEN, file.metadata().unwrap().len()); + }); + } + + #[test] + fn ut_download_range_resume_1() { + init(); + let file_path = "test_files/ut_download_range_resume_1.txt"; + + let file = File::create(file_path).unwrap(); + file.set_len(FS_FILE_LEN - 10000).unwrap(); + let config = ConfigBuilder::new() + .action(Action::Download) + .mode(Mode::BackGround) + .file_spec(file) + .url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe") + .redirect(true) + .build(); + let task = build_task(config); + ylong_runtime::block_on(async { + let clone_task = task.clone(); + ylong_runtime::spawn(async move { + ylong_runtime::time::sleep(time::Duration::from_secs(2)).await; + clone_task.status.lock().unwrap().state = State::Waiting; + }); + let ret = download_inner(task.clone()).await.unwrap_err(); + assert_eq!(ret, TaskError::Waiting(TaskPhase::UserAbort)); + let file = File::open(file_path).unwrap(); + assert!(file.metadata().unwrap().len() < FS_FILE_LEN - 20000); + download_inner(task.clone()).await.unwrap(); + assert_eq!(file.metadata().unwrap().len(), FS_FILE_LEN); + }); + } + + #[test] + fn ut_download_invalid_task() { + init(); + let file_path = "test_files/ut_download_basic.txt"; + + let file = File::create(file_path).unwrap(); + let config = ConfigBuilder::new() + .action(Action::Download) + .mode(Mode::BackGround) + .file_spec(file) + .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt") + .redirect(true) + .build(); + + let task = build_task(config); + { + let mut progress = task.progress.lock().unwrap(); + progress.sizes = vec![0]; + progress.processed = vec![]; + progress.common_data.index = 23; + progress.common_data.state = State::Failed.repr; + progress.common_data.total_processed = 321223; + } + ylong_runtime::block_on(async { + download_inner(task.clone()).await.unwrap(); + let file = File::open(file_path).unwrap(); + assert_eq!(GITEE_FILE_LEN, file.metadata().unwrap().len()); + + assert_eq!(State::Completed, task.status.lock().unwrap().state); + assert_eq!(0, task.progress.lock().unwrap().common_data.index); + assert_eq!( + GITEE_FILE_LEN, + task.progress.lock().unwrap().common_data.total_processed as u64 + ); + assert_eq!( + GITEE_FILE_LEN, + task.progress.lock().unwrap().processed[0] as u64 + ); + assert_eq!( + GITEE_FILE_LEN, + task.progress.lock().unwrap().sizes[0] as u64 + ); + }); + } + + /// For xts SUB_REQUEST_CROSSPLATFORM_DOWNDLOAD_API_TASKINFO_0002, + /// downloadTotalBytes to be -1 + #[test] + fn ut_download_sizes() { + init(); + let file_path = "test_files/ut_download_basic.txt"; + + let file = File::create(file_path).unwrap(); + let config = ConfigBuilder::new() + .action(Action::Download) + .mode(Mode::BackGround) + .file_spec(file) + .url("https://gitee.com/chenzhixue/downloadTest/releases/download/v1.0/test_not_exists.apk") + .redirect(true) + .build(); + + let task = build_task(config); + { + let mut progress = task.progress.lock().unwrap(); + progress.sizes = vec![0, 1, 2, 3]; + progress.processed = vec![]; + progress.common_data.index = 23; + progress.common_data.state = State::Failed.repr; + progress.common_data.total_processed = 321223; + } + ylong_runtime::block_on(async { + let err = download_inner(task.clone()).await.unwrap_err(); + assert_eq!(err, TaskError::Failed(Reason::ProtocolError)); + let sizes = task.progress.lock().unwrap().sizes.clone(); + assert_eq!(sizes, vec![-1]); + }); + } } diff --git a/services/src/task/ffi.rs b/services/src/task/ffi.rs index 9d1c1d35..df72da2b 100644 --- a/services/src/task/ffi.rs +++ b/services/src/task/ffi.rs @@ -18,9 +18,12 @@ use super::info::{CommonTaskInfo, InfoSet, TaskInfo, UpdateInfo}; use super::notify::{CommonProgress, EachFileStatus, Progress}; use super::reason::Reason; use crate::task::info::State; -use crate::utils::c_wrapper::{ - CFileSpec, CFormItem, CStringWrapper, DeleteCFileSpec, DeleteCFormItem, DeleteCStringPtr, -}; +use crate::utils::c_wrapper::{CFileSpec, CFormItem, CStringWrapper}; + +cfg_oh! { + use crate::utils::c_wrapper::{DeleteCFileSpec, DeleteCFormItem, DeleteCStringPtr}; +} + use crate::utils::form_item::{FileSpec, FormItem}; use crate::utils::{build_vec, get_current_timestamp, split_string, string_to_hashmap}; @@ -214,9 +217,12 @@ impl TaskInfo { common_data: c_struct.common_data, }; - unsafe { DeleteCFormItem(c_struct.form_items_ptr) }; - unsafe { DeleteCFileSpec(c_struct.file_specs_ptr) }; - unsafe { DeleteCEachFileStatus(c_struct.each_file_status_ptr) }; + #[cfg(feature = "oh")] + { + unsafe { DeleteCFormItem(c_struct.form_items_ptr) }; + unsafe { DeleteCFileSpec(c_struct.file_specs_ptr) }; + unsafe { DeleteCEachFileStatus(c_struct.each_file_status_ptr) }; + } task_info } } @@ -374,15 +380,20 @@ impl TaskConfig { background: c_struct.common_data.background, }, }; - unsafe { DeleteCFormItem(c_struct.form_items_ptr) }; - unsafe { DeleteCFileSpec(c_struct.file_specs_ptr) }; - unsafe { DeleteCStringPtr(c_struct.body_file_names_ptr) }; - unsafe { DeleteCStringPtr(c_struct.certs_path_ptr) }; + + #[cfg(feature = "oh")] + { + unsafe { DeleteCFormItem(c_struct.form_items_ptr) }; + unsafe { DeleteCFileSpec(c_struct.file_specs_ptr) }; + unsafe { DeleteCStringPtr(c_struct.body_file_names_ptr) }; + unsafe { DeleteCStringPtr(c_struct.certs_path_ptr) }; + } + task_config } } +#[cfg(feature = "oh")] -#[link(name = "download_server_cxx", kind = "static")] extern "C" { pub(crate) fn DeleteCEachFileStatus(ptr: *const CEachFileStatus); } diff --git a/services/src/task/info.rs b/services/src/task/info.rs index 127ed975..5f5e3413 100644 --- a/services/src/task/info.rs +++ b/services/src/task/info.rs @@ -125,24 +125,18 @@ impl State { State::Failed => { from != State::Completed && from != State::Removed && from != State::Stopped } - State::Waiting => from.is_doing(), + State::Waiting => from.is_doing() || from == State::Initialized, State::Running | State::Retrying => { from == State::Waiting || from == State::Paused || from == State::Stopped || from == State::Failed + || from == State::Initialized } State::Removed => from != State::Removed, _ => false, } } - - pub(crate) fn check_resume(&self) -> bool { - *self == State::Waiting - || *self == State::Paused - || *self == State::Failed - || *self == State::Stopped - } } pub(crate) struct UpdateInfo { diff --git a/services/src/task/mod.rs b/services/src/task/mod.rs index 64e993bf..c24b993d 100644 --- a/services/src/task/mod.rs +++ b/services/src/task/mod.rs @@ -11,20 +11,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod client; /// request task config pub mod config; /// request task info pub mod info; +pub(crate) mod reason; pub(crate) mod download; -pub(crate) mod ffi; pub(crate) mod files; pub(crate) mod notify; mod operator; -pub(crate) mod reason; pub(crate) mod request_task; -pub(crate) mod upload; - pub(crate) const ATOMIC_SERVICE: u32 = 1; +pub(crate) mod client; + +pub(crate) mod ffi; +pub(crate) mod upload; diff --git a/services/src/task/notify.rs b/services/src/task/notify.rs index f4bcbdc9..3d8be756 100644 --- a/services/src/task/notify.rs +++ b/services/src/task/notify.rs @@ -56,7 +56,8 @@ pub(crate) struct Progress { pub(crate) extras: HashMap, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] +#[repr(C)] pub(crate) struct EachFileStatus { pub(crate) path: String, pub(crate) reason: Reason, diff --git a/services/src/task/operator.rs b/services/src/task/operator.rs index 91b27f7e..81ea55f0 100644 --- a/services/src/task/operator.rs +++ b/services/src/task/operator.rs @@ -22,12 +22,15 @@ use ylong_http_client::HttpClientError; use ylong_runtime::io::AsyncWrite; use ylong_runtime::time::{sleep, Sleep}; -use super::config::Mode; -use crate::manage::account::is_active_user; -use crate::manage::notifier::Notifier; +cfg_oh! { + use crate::manage::account::is_active_user; + use crate::manage::notifier::Notifier; + use super::config::Mode; + use crate::task::reason::Reason; +} + +use crate::info::State; use crate::task::config::Version; -use crate::task::info::State; -use crate::task::reason::Reason; use crate::task::request_task::RequestTask; use crate::utils::get_current_timestamp; @@ -61,30 +64,40 @@ impl TaskOperator { let current = get_current_timestamp(); let state = self.task.status.lock().unwrap().state; - if (state != State::Running && state != State::Retrying) - || (self.task.conf.version == Version::API10 && !self.task.check_network_status()) - { - info!("pause the task, tid: {}", self.task.task_id()); - return Poll::Ready(Err(HttpClientError::user_aborted())); - } - if !self.task.check_app_state() { - info!("pause for app state, tid: {}", self.task.task_id()); + if state != State::Running && state != State::Retrying { + info!("pause task {} new state {:?} ", self.task.task_id(), state); return Poll::Ready(Err(HttpClientError::user_aborted())); } - if self.task.conf.common_data.mode == Mode::BackGround && !is_active_user(self.task.uid()) { - info!("pause for user stopped, tid: {}", self.task.task_id()); - self.task - .change_task_status(State::Waiting, Reason::AccountStopped); + if self.task.conf.version == Version::API10 && !self.task.check_network_status() { + info!("pause for network status, tid: {}", self.task.task_id()); return Poll::Ready(Err(HttpClientError::user_aborted())); } + #[cfg(feature = "oh")] + { + if !self.task.check_app_state() { + info!("pause for app state, tid: {}", self.task.task_id()); + return Poll::Ready(Err(HttpClientError::user_aborted())); + } + + if self.task.conf.common_data.mode == Mode::BackGround + && !is_active_user(self.task.uid()) + { + info!("pause for user stopped, tid: {}", self.task.task_id()); + self.task + .change_task_status(State::Waiting, Reason::AccountStopped); + return Poll::Ready(Err(HttpClientError::user_aborted())); + } + + if current >= self.task.last_notify.load(Ordering::SeqCst) + FRONT_NOTIFY_INTERVAL { + let notify_data = self.task.build_notify_data(); + self.task.last_notify.store(current, Ordering::SeqCst); + Notifier::progress(&self.task.client_manager, notify_data); + } + } + let version = self.task.conf.version; - if current >= self.task.last_notify.load(Ordering::SeqCst) + FRONT_NOTIFY_INTERVAL { - let notify_data = self.task.build_notify_data(); - self.task.last_notify.store(current, Ordering::SeqCst); - Notifier::progress(&self.task.client_manager, notify_data); - } let gauge = self.task.conf.common_data.gauge; if version == Version::API9 || gauge { @@ -146,38 +159,6 @@ impl TaskOperator { Poll::Ready(Ok(())) } - pub(crate) fn poll_write_partial_file( - &self, - cx: &mut Context<'_>, - data: &[u8], - begins: u64, - ends: i64, - ) -> Poll> { - let data_size = data.len(); - let skip_size = self.task.skip_bytes.load(Ordering::SeqCst); - if skip_size + data_size as u64 <= begins { - self.task - .skip_bytes - .fetch_add(data_size as u64, Ordering::SeqCst); - return Poll::Ready(Ok(data_size)); - } - let remain_skip_bytes = (begins - skip_size) as usize; - let mut data = &data[remain_skip_bytes..]; - self.task.skip_bytes.store(begins, Ordering::SeqCst); - if ends >= 0 { - let total_bytes = ends as u64 - begins + 1; - let written_bytes = self.task.progress.lock().unwrap().processed[0] as u64; - if written_bytes == total_bytes { - return Poll::Ready(Err(HttpClientError::user_aborted())); - } - if data.len() as u64 + written_bytes >= total_bytes { - let remain_bytes = (total_bytes - written_bytes) as usize; - data = &data[..remain_bytes]; - } - } - self.poll_write_file(cx, data, remain_skip_bytes) - } - pub(crate) fn poll_write_file( &self, cx: &mut Context<'_>, diff --git a/services/src/task/request_task.rs b/services/src/task/request_task.rs index 18b1ed3f..f2ab73d5 100644 --- a/services/src/task/request_task.rs +++ b/services/src/task/request_task.rs @@ -11,40 +11,42 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::SeekFrom; +use std::io::{self, SeekFrom}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Mutex, MutexGuard}; -use std::thread::sleep; use std::time::Duration; use ylong_http_client::async_impl::{Body, Client, Request, RequestBuilder, Response}; use ylong_http_client::{ErrorKind, HttpClientError}; use ylong_runtime::io::{AsyncSeekExt, AsyncWriteExt}; +cfg_oh! { + use crate::manage::app_state::AppState; + use crate::manage::database::Database; + use crate::manage::notifier::Notifier; + use crate::manage::SystemConfig; + use crate::service::client::ClientManagerEntry; + use super::info::UpdateInfo; + use crate::utils::publish_state_change_event; + use crate::utils::{request_background_notify, RequestTaskMsg}; +} + use super::config::{Mode, Version}; -use super::info::{CommonTaskInfo, State, TaskInfo, UpdateInfo}; +use super::info::{CommonTaskInfo, State, TaskInfo}; use super::notify::{EachFileStatus, NotifyData, Progress}; use super::reason::Reason; use crate::error::ErrorCode; -use crate::manage::app_state::AppState; -use crate::manage::database::Database; -use crate::manage::notifier::Notifier; -use crate::manage::{self, SystemConfig}; -use crate::service::client::ClientManagerEntry; use crate::task::client::build_client; use crate::task::config::{Action, TaskConfig}; use crate::task::files::{AttachedFiles, Files}; use crate::utils::form_item::FileSpec; -use crate::utils::{ - get_current_timestamp, publish_state_change_event, request_background_notify, RequestTaskMsg, -}; +use crate::utils::get_current_timestamp; const RETRY_INTERVAL: u64 = 200; +#[allow(unused)] pub(crate) struct RequestTask { pub(crate) conf: TaskConfig, - pub(crate) app_state: AppState, pub(crate) client: Client, - pub(crate) client_manager: ClientManagerEntry, pub(crate) files: Files, pub(crate) body_files: Files, pub(crate) ctime: u64, @@ -67,7 +69,11 @@ pub(crate) struct RequestTask { pub(crate) upload_counts: AtomicUsize, pub(crate) rate_limiting: AtomicU64, pub(crate) last_notify: AtomicU64, - pub(crate) network: manage::Network, + pub(crate) network: crate::manage::Network, + #[cfg(feature = "oh")] + pub(crate) app_state: AppState, + #[cfg(feature = "oh")] + pub(crate) client_manager: ClientManagerEntry, } impl RequestTask { @@ -107,8 +113,8 @@ impl RequestTask { } } - pub(crate) fn satisfied(&self) -> bool { - if !self.network_online() || !self.check_network_status() { + pub(crate) async fn satisfied(&self) -> bool { + if !self.network_online().await || !self.check_network_status() { error!("check network failed, tid: {}", self.task_id()); false } else { @@ -120,11 +126,11 @@ impl RequestTask { impl RequestTask { pub(crate) fn new( config: TaskConfig, - app_state: AppState, + #[cfg(feature = "oh")] app_state: AppState, files: AttachedFiles, client: Client, - client_manager: ClientManagerEntry, - network: manage::Network, + #[cfg(feature = "oh")] client_manager: ClientManagerEntry, + network: crate::manage::Network, ) -> RequestTask { let file_len = files.files.len(); let action = config.common_data.action; @@ -170,22 +176,27 @@ impl RequestTask { skip_bytes: AtomicU64::new(0), upload_counts: AtomicUsize::new(0), rate_limiting: AtomicU64::new(0), - app_state, last_notify: AtomicU64::new(time), - client_manager, network, + #[cfg(feature = "oh")] + client_manager, + #[cfg(feature = "oh")] + app_state, } } pub(crate) fn new_by_info( config: TaskConfig, - system: SystemConfig, - app_state: AppState, + #[cfg(feature = "oh")] system: SystemConfig, + #[cfg(feature = "oh")] app_state: AppState, info: TaskInfo, - client_manager: ClientManagerEntry, - network: manage::Network, + #[cfg(feature = "oh")] client_manager: ClientManagerEntry, + network: crate::manage::Network, ) -> Result { + #[cfg(feature = "oh")] let (files, client) = check_config(&config, system)?; + #[cfg(not(feature = "oh"))] + let (files, client) = check_config(&config)?; let file_len = files.files.len(); let action = config.common_data.action; @@ -242,10 +253,12 @@ impl RequestTask { skip_bytes: AtomicU64::new(0), upload_counts: AtomicUsize::new(upload_counts), rate_limiting: AtomicU64::new(0), - app_state, last_notify: AtomicU64::new(time), - client_manager, network, + #[cfg(feature = "oh")] + client_manager, + #[cfg(feature = "oh")] + app_state, }) } @@ -270,12 +283,14 @@ impl RequestTask { && self.conf.common_data.retry) { self.change_task_status(State::Failed, Reason::UnsupportedNetworkType); + } else { + self.change_task_status(State::Waiting, Reason::UnsupportedNetworkType); } return false; } true } - + #[cfg(feature = "oh")] pub(crate) fn check_app_state(&self) -> bool { if self.conf.common_data.mode == Mode::FrontEnd && !self.app_state.is_foreground() { if self.conf.common_data.action == Action::Upload { @@ -289,7 +304,7 @@ impl RequestTask { } } - pub(crate) fn network_online(&self) -> bool { + pub(crate) async fn network_online(&self) -> bool { if !self.network.is_online() { if self.conf.version == Version::API10 && self.conf.common_data.mode == Mode::BackGround @@ -302,7 +317,7 @@ impl RequestTask { if self.network.is_online() { return true; } - sleep(Duration::from_millis(RETRY_INTERVAL)); + ylong_runtime::time::sleep(Duration::from_millis(RETRY_INTERVAL)); } self.change_task_status(State::Failed, Reason::NetworkOffline); } @@ -311,6 +326,23 @@ impl RequestTask { true } + pub(crate) fn prepare_running(&self) { + let old_state = self.status.lock().unwrap().state; + info!( + "task {} prepare running, action: {:?}, state :{:?}", + self.task_id(), + self.action(), + old_state, + ); + #[cfg(feature = "oh")] + if old_state == State::Paused { + let notify_data = self.build_notify_data(); + Notifier::resume(&self.client_manager, notify_data); + } + + self.change_task_status(State::Running, Reason::Default); + } + pub(crate) fn build_request_builder(&self) -> Result { use ylong_http_client::async_impl::PercentEncoder; @@ -346,111 +378,99 @@ impl RequestTask { Ok(request) } - async fn clear_downloaded_file(&self) -> bool { + pub(crate) async fn clear_downloaded_file(&self) -> Result<(), std::io::Error> { + info!("task {} clear downloaded file", self.task_id()); let file = self.files.get_mut(0).unwrap(); - let res = file.set_len(0).await; - match res { - Err(e) => { - error!("clear download file error: {:?}", e); - self.change_task_status(State::Failed, Reason::IoError); - false - } - _ => { - debug!("set len success"); - match file.seek(SeekFrom::Start(0)).await { - Err(e) => { - error!("seek err is {:?}", e); - self.change_task_status(State::Failed, Reason::IoError); - false - } - Ok(_) => { - debug!("seek success"); - let mut progress_guard = self.progress.lock().unwrap(); - progress_guard.common_data.total_processed = 0; - progress_guard.processed[0] = 0; - true - } - } - } - } + file.set_len(0).await?; + file.seek(SeekFrom::Start(0)).await?; + + let mut progress_guard = self.progress.lock().unwrap(); + progress_guard.common_data.total_processed = 0; + progress_guard.processed[0] = 0; + + Ok(()) } - pub(crate) async fn build_download_request(&self) -> Option { - let mut request_builder = match self.build_request_builder() { - Ok(builder) => builder, - _ => { - self.change_task_status(State::Failed, Reason::BuildRequestFailed); - return None; + pub(crate) async fn build_download_request(&self) -> Result { + let mut request_builder = self.build_request_builder()?; + + let file = self.files.get_mut(0).unwrap(); + + let has_downloaded = file.metadata().await?.len(); + let resume_download = has_downloaded > 0; + let require_range = self.require_range(); + + let begins = self.conf.common_data.begins; + let ends = self.conf.common_data.ends; + + info!( + "task {} build download request, resume_download: {}, require_range: {}", + self.task_id(), + resume_download, + require_range + ); + match (resume_download, require_range) { + (true, false) => { + let (builder, support_range) = self.support_range(request_builder); + request_builder = builder; + if support_range { + request_builder = + self.range_request(request_builder, begins + has_downloaded, ends); + } else { + self.clear_downloaded_file().await?; + } } + (false, true) => { + request_builder = self.range_request(request_builder, begins, ends); + } + (true, true) => { + let (builder, support_range) = self.support_range(request_builder); + request_builder = builder; + if support_range { + request_builder = + self.range_request(request_builder, begins + has_downloaded, ends); + } else { + return Err(TaskError::Failed(Reason::UnsupportedRangeRequest)); + } + } + (false, false) => {} }; - let mut begins = self.conf.common_data.begins; - let ends = self.conf.common_data.ends; - self.range_response.store(false, Ordering::SeqCst); - if self.resume.load(Ordering::SeqCst) || begins > 0 || ends >= 0 { - self.range_request.store(true, Ordering::SeqCst); - self.skip_bytes.store(0, Ordering::SeqCst); - if self.resume.load(Ordering::SeqCst) { - let if_range = { - let progress_guard = self.progress.lock().unwrap(); - let etag = progress_guard.extras.get("etag"); - let last_modified = progress_guard.extras.get("last-modified"); - if let Some(etag) = etag { - request_builder = request_builder.header("If-Range", etag.as_str()); - true - } else if let Some(last_modified) = last_modified { - request_builder = - request_builder.header("If-Range", last_modified.as_str()); - true - } else { - false - } - }; - if !if_range { - // unable to verify file consistency, need download again - if begins == 0 && ends < 0 { - self.range_request.store(false, Ordering::SeqCst); - } - if !self.clear_downloaded_file().await { - return None; - } - } - } - let file = self.files.get_mut(0).unwrap(); - let current_len = file.metadata().await.unwrap().len(); - begins += current_len; - // Modifys the progress to the current file size. - // It will be recorded to the database later during download. - let mut progress_guard = self.progress.lock().unwrap(); - progress_guard.processed[0] = current_len as usize; - progress_guard.common_data.total_processed = current_len as usize; - if self.range_request.load(Ordering::SeqCst) { - let range = if ends < 0 { - format!("bytes={begins}-") - } else { - format!("bytes={begins}-{ends}") - }; - request_builder = request_builder.header("Range", range.as_str()); - } - } else { - self.range_request.store(false, Ordering::SeqCst); - } - let result = request_builder.body(Body::slice(self.conf.data.clone())); - match result { - Ok(value) => Some(value), - Err(e) => { - error!("build download request error is {:?}", e); - self.change_task_status(State::Failed, Reason::BuildRequestFailed); - None - } - } + let request = request_builder.body(Body::slice(self.conf.data.clone()))?; + Ok(request) } - pub(crate) fn get_file_info(&self, response: &Response) -> bool { - if self.get_file_info.load(Ordering::SeqCst) { - return true; + fn range_request( + &self, + request_builder: RequestBuilder, + begins: u64, + ends: i64, + ) -> RequestBuilder { + let range = if ends < 0 { + format!("bytes={begins}-") + } else { + format!("bytes={begins}-{ends}") + }; + request_builder.header("Range", range.as_str()) + } + + fn support_range(&self, mut request_builder: RequestBuilder) -> (RequestBuilder, bool) { + let progress_guard = self.progress.lock().unwrap(); + let mut support_range = false; + if let Some(etag) = progress_guard.extras.get("etag") { + request_builder = request_builder.header("If-Range", etag.as_str()); + support_range = true; + } else if let Some(last_modified) = progress_guard.extras.get("last-modified") { + request_builder = request_builder.header("If-Range", last_modified.as_str()); + support_range = true; } - self.get_file_info.store(true, Ordering::SeqCst); + if !support_range { + info!("task {} does not support range request", self.task_id()); + } + (request_builder, support_range) + } + + pub(crate) fn get_file_info(&self, response: &Response) -> Result<(), TaskError> { let content_type = response.headers().get("content-type"); if let Some(mime_type) = content_type { if let Ok(value) = mime_type.to_string() { @@ -459,167 +479,72 @@ impl RequestTask { } let content_length = response.headers().get("content-length"); - if let Some(len) = content_length { - let length = len.to_string(); - match length { - Ok(value) => { - let len = value.parse::(); - match len { - Ok(v) => { - let mut guard = self.progress.lock().unwrap(); - if !self.restored.load(Ordering::SeqCst) { - guard.sizes[0] = v + guard.processed[0] as i64; - } - self.file_total_size.store(v, Ordering::SeqCst); - debug!("the download task content-length is {}", v); - } - Err(e) => { - error!("convert string to i64 error: {:?}", e); - } - } + if let Some(Ok(len)) = content_length.map(|v| v.to_string()) { + match len.parse::() { + Ok(v) => { + let mut progress = self.progress.lock().unwrap(); + progress.sizes = vec![v + progress.processed[0] as i64]; + self.file_total_size.store(v, Ordering::SeqCst); + debug!("the download task content-length is {}", v); } Err(e) => { - error!("convert header value to string error: {:?}", e); + error!("convert string to i64 error: {:?}", e); } } } else { error!("cannot get content-length of the task"); if self.conf.common_data.precise { - self.change_task_status(State::Failed, Reason::GetFileSizeFailed); - return false; + return Err(TaskError::Failed(Reason::GetFileSizeFailed)); } } - true + Ok(()) } - async fn handle_body_transfer_error(&self) { + pub(crate) async fn handle_body_transfer_error(&self) -> Result<(), TaskError> { + #[cfg(feature = "oh")] if self.network.check_interval_online().await { - self.change_task_status(State::Failed, Reason::OthersError); + return Err(TaskError::Failed(Reason::OthersError)); } else { match self.conf.version { Version::API9 => { if self.conf.common_data.action == Action::Upload { - self.change_task_status(State::Failed, Reason::NetworkOffline); + return Err(TaskError::Failed(Reason::NetworkOffline)); } } Version::API10 => { if self.conf.common_data.mode == Mode::FrontEnd || !self.conf.common_data.retry { - self.change_task_status(State::Failed, Reason::NetworkOffline); + return Err(TaskError::Failed(Reason::NetworkOffline)); } } } } + Err(TaskError::Waiting(TaskPhase::NetworkOffline)) } - pub(crate) async fn handle_download_error(&self, result: &Result<(), HttpClientError>) -> bool { - match result { - Ok(_) => true, - Err(err) => { - error!("download err is {:?}", err); - match err.error_kind() { - ErrorKind::Timeout => { - self.change_task_status(State::Failed, Reason::ContinuousTaskTimeout); - } - // user triggered - ErrorKind::UserAborted => return true, - ErrorKind::BodyTransfer | ErrorKind::BodyDecode => { - self.handle_body_transfer_error().await; - } - _ => { - if format!("{}", err).contains("No space left on device") { - self.change_task_status(State::Failed, Reason::InsufficientSpace); - } else { - self.change_task_status(State::Failed, Reason::OthersError); - } - } - } - false - } - } - } - - pub(crate) async fn handle_response_error( + pub(crate) async fn handle_download_error( &self, - response: &Result, - ) -> bool { - let index = self.progress.lock().unwrap().common_data.index; - match response { - Ok(r) => { - let http_response_code = r.status(); - info!( - "task {} get http response code {}", - self.conf.common_data.task_id, http_response_code - ); - if http_response_code.is_server_error() - || (http_response_code.as_u16() != 408 && http_response_code.is_client_error()) - || http_response_code.is_redirection() - { - self.set_code(index, Reason::ProtocolError, false); - return false; - } - if http_response_code.as_u16() == 408 { - if !self.retry_for_request.load(Ordering::SeqCst) { - self.retry_for_request.store(true, Ordering::SeqCst); - } else { - self.set_code(index, Reason::ProtocolError, false); - } - return false; - } - - if self.range_request.load(Ordering::SeqCst) { - match http_response_code.as_u16() { - 206 => { - self.range_response.store(true, Ordering::SeqCst); - } - 200 => { - self.range_response.store(false, Ordering::SeqCst); - if self.resume.load(Ordering::SeqCst) { - if !self.clear_downloaded_file().await { - return false; - } - } else { - self.set_code(index, Reason::UnsupportedRangeRequest, false); - return false; - } - } - _ => {} - } - } - true + err: HttpClientError, + ) -> Result<(), TaskError> { + error!("download err is {:?}", err); + match err.error_kind() { + ErrorKind::Timeout => Err(TaskError::Failed(Reason::ContinuousTaskTimeout)), + // user triggered + ErrorKind::UserAborted => Err(TaskError::Waiting(TaskPhase::UserAbort)), + ErrorKind::BodyTransfer | ErrorKind::BodyDecode => { + self.handle_body_transfer_error().await } - Err(e) => { - error!("http client err is {:?}", e); - match e.error_kind() { - ErrorKind::UserAborted => self.set_code(index, Reason::UserOperation, false), - ErrorKind::Timeout => { - self.set_code(index, Reason::ContinuousTaskTimeout, false) - } - ErrorKind::Request => self.set_code(index, Reason::RequestError, false), - ErrorKind::Redirect => self.set_code(index, Reason::RedirectError, false), - ErrorKind::Connect | ErrorKind::ConnectionUpgrade => { - if e.is_dns_error() { - self.set_code(index, Reason::Dns, false); - } else if e.is_tls_error() { - self.set_code(index, Reason::Ssl, false); - } else { - self.set_code(index, Reason::Tcp, false); - } - } - ErrorKind::BodyTransfer => self.handle_body_transfer_error().await, - _ => { - if format!("{}", e).contains("No space left on device") { - self.set_code(index, Reason::InsufficientSpace, false); - } else { - self.set_code(index, Reason::OthersError, false); - } - } + _ => { + if format!("{}", err).contains("No space left on device") { + Err(TaskError::Failed(Reason::InsufficientSpace)) + } else { + Err(TaskError::Failed(Reason::OthersError)) } - false } } } + #[cfg(feature = "oh")] pub(crate) fn notify_response(&self, response: &Response) { let tid = self.conf.common_data.task_id; let version: String = response.version().as_str().into(); @@ -637,17 +562,8 @@ impl RequestTask { .send_response(tid, version, status_code, status_message, headers) } - pub(crate) fn record_response_header(&self, response: &Result) { - if let Ok(r) = response { - self.notify_response(r); - let mut guard = self.progress.lock().unwrap(); - guard.extras.clear(); - for (k, v) in r.headers() { - if let Ok(value) = v.to_string() { - guard.extras.insert(k.to_string().to_lowercase(), value); - } - } - } + pub(crate) fn require_range(&self) -> bool { + self.conf.common_data.begins > 0 || self.conf.common_data.ends >= 0 } pub(crate) async fn record_upload_response( @@ -655,7 +571,6 @@ impl RequestTask { index: usize, response: Result, ) { - self.record_response_header(&response); if let Ok(mut r) = response { let file = match self.body_files.get_mut(index) { Some(file) => file, @@ -681,10 +596,6 @@ impl RequestTask { } pub(crate) fn set_code(&self, index: usize, code: Reason, is_force: bool) { - // why? - if code == Reason::UploadFileError { - return; - } // `unwrap` for propagating panics among threads. let mut codes_guard = self.code.lock().unwrap(); match codes_guard.get_mut(index) { @@ -694,7 +605,7 @@ impl RequestTask { } } None => { - info!( + error!( "set code index error; tid: {}, index: {}, code: {:?}", self.conf.common_data.task_id, index, code ); @@ -736,22 +647,27 @@ impl RequestTask { } else { self.set_code(index, to_reason, false); } - let codes_guard = self.code.lock().unwrap(); - let update_info = UpdateInfo { - mtime: task_status.mtime, - reason: task_status.reason.repr, - progress: progress.clone(), - each_file_status: RequestTask::get_each_file_status_by_code( - &codes_guard, - &self.conf.file_specs, - ), - tries: self.tries.load(Ordering::SeqCst), - mime_type: self.mime_type.lock().unwrap().clone(), - }; - Database::get_instance().update_task(self.task_id(), update_info); + #[cfg(feature = "oh")] + { + let codes_guard = self.code.lock().unwrap(); + let update_info = UpdateInfo { + mtime: task_status.mtime, + reason: task_status.reason.repr, + progress: progress.clone(), + each_file_status: RequestTask::get_each_file_status_by_code( + &codes_guard, + &self.conf.file_specs, + ), + tries: self.tries.load(Ordering::SeqCst), + mime_type: self.mime_type.lock().unwrap().clone(), + }; + Database::get_instance().update_task(self.task_id(), update_info); + } + ErrorCode::ErrOk } + #[cfg(feature = "oh")] pub(crate) fn state_change_notify(&self) { let state = self.status.lock().unwrap().state; let total_processed = self.progress.lock().unwrap().common_data.total_processed; @@ -794,6 +710,7 @@ impl RequestTask { self.background_notify(); } + #[cfg(feature = "oh")] pub(crate) fn state_change_notify_of_no_run( client_manager: &ClientManagerEntry, notify_data: NotifyData, @@ -941,21 +858,25 @@ impl RequestTask { if index >= self.conf.file_specs.len() { return; } - let percent = total_processed * 100 / (file_total_size as u64); - debug!("background notify"); - let task_msg = RequestTaskMsg { - task_id: self.conf.common_data.task_id, - uid: self.conf.common_data.uid as i32, - action: self.conf.common_data.action.repr, - }; - let path = self.conf.file_specs[index].path.as_str(); - let file_name = self.conf.file_specs[index].file_name.as_str(); - let _ = request_background_notify(task_msg, path, file_name, percent as u32); + #[cfg(feature = "oh")] + { + let percent = total_processed * 100 / (file_total_size as u64); + let task_msg = RequestTaskMsg { + task_id: self.conf.common_data.task_id, + uid: self.conf.common_data.uid as i32, + action: self.conf.common_data.action.repr, + }; + + let path = self.conf.file_specs[index].path.as_str(); + let file_name = self.conf.file_specs[index].file_name.as_str(); + let _ = request_background_notify(task_msg, path, file_name, percent as u32); + } } pub(crate) fn get_upload_info(&self, index: usize) -> (bool, u64) { let guard = self.progress.lock().unwrap(); + let file_size = guard.sizes[index]; let mut is_partial_upload = false; let mut upload_file_length: u64 = file_size as u64 - guard.processed[index] as u64; @@ -978,6 +899,7 @@ impl RequestTask { (is_partial_upload, upload_file_length) } + #[cfg(feature = "oh")] pub(crate) fn notify_header_receive(&self) { if self.conf.version == Version::API9 && self.conf.common_data.action == Action::Upload { let notify_data = self.build_notify_data(); @@ -1025,12 +947,41 @@ fn check_file_specs(file_specs: &[FileSpec]) -> bool { pub(crate) fn check_config( config: &TaskConfig, - system: SystemConfig, + #[cfg(feature = "oh")] system: SystemConfig, ) -> Result<(AttachedFiles, Client), ErrorCode> { if !check_file_specs(&config.file_specs) { return Err(ErrorCode::Other); } let files = AttachedFiles::open(config).map_err(|_| ErrorCode::FileOperationErr)?; + #[cfg(feature = "oh")] let client = build_client(config, system).map_err(|_| ErrorCode::Other)?; + + #[cfg(not(feature = "oh"))] + let client = build_client(config).map_err(|_| ErrorCode::Other)?; Ok((files, client)) } + +impl From for TaskError { + fn from(_value: HttpClientError) -> Self { + TaskError::Failed(Reason::BuildRequestFailed) + } +} + +impl From for TaskError { + fn from(_value: io::Error) -> Self { + TaskError::Failed(Reason::IoError) + } +} + +#[derive(Debug, PartialEq, Eq)] +pub enum TaskPhase { + NeedRetry, + UserAbort, + NetworkOffline, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum TaskError { + Failed(Reason), + Waiting(TaskPhase), +} diff --git a/services/src/task/upload.rs b/services/src/task/upload.rs index 7d6c0a6f..f21bc7d7 100644 --- a/services/src/task/upload.rs +++ b/services/src/task/upload.rs @@ -18,11 +18,12 @@ use std::sync::Arc; use std::task::{Context, Poll}; use ylong_http_client::async_impl::{Body, MultiPart, Part, Request, UploadOperator, Uploader}; -use ylong_http_client::HttpClientError; -use ylong_runtime::io::{AsyncRead, AsyncSeek, ReadBuf}; +use ylong_http_client::{ErrorKind, HttpClientError}; +use ylong_runtime::io::{AsyncRead, AsyncSeek, AsyncSeekExt, ReadBuf}; use super::operator::TaskOperator; use super::reason::Reason; +use super::request_task::{TaskError, TaskPhase}; use crate::task::info::State; use crate::task::request_task::RequestTask; #[cfg(feature = "oh")] @@ -30,11 +31,12 @@ use crate::trace::Trace; struct TaskReader { pub(crate) task: Arc, + pub(crate) index: usize, } impl TaskReader { - pub(crate) fn new(task: Arc) -> Self { - Self { task } + pub(crate) fn new(task: Arc, index: usize) -> Self { + Self { task, index } } } @@ -44,7 +46,7 @@ impl AsyncRead for TaskReader { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - let index = self.task.progress.lock().unwrap().common_data.index; + let index = self.index; let file = self.task.files.get_mut(index).unwrap(); let (is_partial_upload, total_upload_bytes) = self.task.get_upload_info(index); let mut progress_guard = self.task.progress.lock().unwrap(); @@ -108,7 +110,7 @@ impl UploadOperator for TaskOperator { fn build_stream_request(task: Arc, index: usize) -> Option { debug!("build stream request"); - let task_reader = TaskReader::new(task.clone()); + let task_reader = TaskReader::new(task.clone(), index); let task_operator = TaskOperator::new(task.clone()); match task.build_request_builder() { @@ -135,7 +137,7 @@ fn build_stream_request(task: Arc, index: usize) -> Option fn build_multipart_request(task: Arc, index: usize) -> Option { debug!("build multipart request"); - let task_reader = TaskReader::new(task.clone()); + let task_reader = TaskReader::new(task.clone(), index); let task_operator = TaskOperator::new(task.clone()); let mut multi_part = MultiPart::new(); for item in task.conf.form_items.iter() { @@ -190,8 +192,46 @@ fn build_request_common( } } +impl RequestTask { + fn prepare_single_upload(&self, index: usize) -> bool { + if let Some(file) = self.files.get_mut(index) { + let mut progress = self.progress.lock().unwrap(); + progress.common_data.index = index; + progress.common_data.total_processed = 0; + progress.processed[index] = 0; + file.seek(SeekFrom::Start(0)); + true + } else { + error!("task {} file {} not found", self.task_id(), index); + false + } + } +} + pub(crate) async fn upload(task: Arc) { - debug!("begin upload task, tid: {}", task.conf.common_data.task_id); + loop { + if let Err(e) = upload_inner(task.clone()).await { + match e { + TaskError::Failed(reason) => { + task.change_task_status(State::Failed, reason); + } + TaskError::Waiting(phase) => match phase { + TaskPhase::NeedRetry => { + continue; + } + TaskPhase::UserAbort => {} + TaskPhase::NetworkOffline => {} + }, + } + } + break; + } +} + +async fn upload_inner(task: Arc) -> Result<(), TaskError> { + task.prepare_running(); + + info!("upload task {} start running", task.task_id()); #[cfg(feature = "oh")] { @@ -202,81 +242,44 @@ pub(crate) async fn upload(task: Arc) { } let size = task.conf.file_specs.len(); - loop { - let index = task.progress.lock().unwrap().common_data.index; + let start = task.progress.lock().unwrap().common_data.index; + + for index in start..size { + if !task.prepare_single_upload(index) { + return Err(TaskError::Failed(Reason::OthersError)); + } let is_multipart = match task.conf.headers.get("Content-Type") { Some(s) => s.eq("multipart/form-data"), None => task.conf.method.to_uppercase().eq("POST"), }; - let result = if is_multipart { - upload_one_file(task.clone(), index, build_multipart_request).await + if is_multipart { + upload_one_file(task.clone(), index, build_multipart_request).await? } else { - upload_one_file(task.clone(), index, build_stream_request).await + upload_one_file(task.clone(), index, build_stream_request).await? }; - - if result { - info!( - "upload one file success, tid: {}, index is {}, size is {}", - task.conf.common_data.task_id, index, size - ); - task.upload_counts.fetch_add(1, Ordering::SeqCst); - } - - { - let task_status = task.status.lock().unwrap(); - if !task_status.state.is_doing() { - return; - } - } - - task.notify_header_receive(); let mut progress = task.progress.lock().unwrap(); - if index + 1 == size { - break; - } - progress.common_data.index += 1; + progress.common_data.index = 0; + #[cfg(feature = "oh")] + task.notify_header_receive(); } - let uploaded = task.upload_counts.load(Ordering::SeqCst); - if uploaded == size { - task.change_task_status(State::Completed, Reason::Default); - } else { - task.change_task_status(State::Failed, Reason::UploadFileError); - - use hisysevent::{build_number_param, build_str_param}; - - use crate::sys_event::SysEvent; - // Records sys event. - - SysEvent::task_fault() - .param(build_str_param!(crate::sys_event::TASKS_TYPE, "UPLOAD")) - .param(build_number_param!(crate::sys_event::TOTAL_FILE_NUM, size)) - .param(build_number_param!( - crate::sys_event::FAIL_FILE_NUM, - size - uploaded - )) - .param(build_number_param!( - crate::sys_event::SUCCESS_FILE_NUM, - uploaded - )) - .param(build_number_param!( - crate::sys_event::ERROR_INFO, - Reason::UploadFileError.repr as i32 - )) - .write(); - } - - debug!("upload end, tid: {}", task.conf.common_data.task_id); + task.change_task_status(State::Completed, Reason::Default); + info!("task {} upload success", task.task_id()); + Ok(()) } -async fn upload_one_file(task: Arc, index: usize, build_upload_request: F) -> bool +async fn upload_one_file( + task: Arc, + index: usize, + build_upload_request: F, +) -> Result<(), TaskError> where F: Fn(Arc, usize) -> Option, { info!( "begin upload one file, tid: {}, index is {}", - task.conf.common_data.task_id, index + task.conf.common_data.task_id, index, ); // Ensures `_trace` can only be freed when this function exits. @@ -289,43 +292,148 @@ where )); } - loop { - task.set_code(index, Reason::Default, true); - let request: Option = build_upload_request(task.clone(), index); - if request.is_none() { - return false; - } - let response = task.client.request(request.unwrap()).await; - if task.handle_response_error(&response).await { - // `unwrap` for propagating panics among threads. - if let Some(code) = task.code.lock().unwrap().get_mut(index) { - *code = Reason::Default; - } - task.record_upload_response(index, response).await; - return true; - } - task.record_upload_response(index, response).await; - // `unwrap` for propagating panics among threads. + let Some(request) = build_upload_request(task.clone(), index) else { + return Err(TaskError::Failed(Reason::BuildRequestFailed)); + }; - let state = task.status.lock().unwrap().state; - if state != State::Running && state != State::Retrying { - return false; - } - let code = *task - .code - .lock() - .unwrap() - .get(index) - .unwrap_or(&Reason::OthersError); - if code != Reason::Default { - error!( - "upload {} file fail, which reason is {}", - index, code.repr as u32 + let response = task.client.request(request).await; + match response.as_ref() { + Ok(response) => { + let status_code = response.status(); + info!( + "task {} get http response code {}", + task.conf.common_data.task_id, status_code, ); - if code != Reason::UserOperation { - task.change_task_status(State::Failed, code); + if status_code.is_server_error() + || (status_code.as_u16() != 408 && status_code.is_client_error()) + || status_code.is_redirection() + { + return Err(TaskError::Failed(Reason::ProtocolError)); + } + if status_code.as_u16() == 408 { + if task.tries.load(Ordering::SeqCst) < 2 { + task.tries.fetch_add(1, Ordering::SeqCst); + info!("task {} server timeout", task.task_id()); + return Err(TaskError::Waiting(TaskPhase::NeedRetry)); + } else { + info!("task {} retry 3 times", task.task_id()); + return Err(TaskError::Failed(Reason::ProtocolError)); + } } - return false; } + Err(e) => { + error!("Task {} {:?}", task.task_id(), e); + + match e.error_kind() { + ErrorKind::Timeout => return Err(TaskError::Failed(Reason::ContinuousTaskTimeout)), + ErrorKind::Request => return Err(TaskError::Failed(Reason::RequestError)), + ErrorKind::Redirect => return Err(TaskError::Failed(Reason::RedirectError)), + ErrorKind::Connect | ErrorKind::ConnectionUpgrade => { + if task.tries.load(Ordering::SeqCst) < 2 { + task.tries.fetch_add(1, Ordering::SeqCst); + return Err(TaskError::Waiting(TaskPhase::NeedRetry)); + } + info!("task {} retry 3 times", task.task_id()); + if e.is_dns_error() { + return Err(TaskError::Failed(Reason::Dns)); + } else if e.is_tls_error() { + return Err(TaskError::Failed(Reason::Ssl)); + } else { + return Err(TaskError::Failed(Reason::Tcp)); + } + } + ErrorKind::BodyTransfer => return task.handle_body_transfer_error().await, + ErrorKind::UserAborted => return Err(TaskError::Waiting(TaskPhase::UserAbort)), + _ => { + if format!("{}", e).contains("No space left on device") { + return Err(TaskError::Failed(Reason::InsufficientSpace)); + } else { + return Err(TaskError::Failed(Reason::OthersError)); + } + } + }; + } + }; + task.record_upload_response(index, response).await; + Ok(()) +} + +#[cfg(not(feature = "oh"))] +#[cfg(test)] +mod test { + use std::fs::File; + use std::io::{BufRead, BufReader, Write}; + use std::net::{TcpListener, TcpStream}; + use std::sync::Arc; + + use crate::config::{Action, ConfigBuilder, Mode, TaskConfig}; + use crate::info::State; + use crate::manage::Network; + use crate::task::request_task::{check_config, RequestTask, TaskError, TaskPhase}; + use crate::task::upload::upload_inner; + + fn build_task(config: TaskConfig) -> Arc { + let (files, client) = check_config(&config).unwrap(); + let network = Network::new(); + let task = Arc::new(RequestTask::new(config, files, client, network)); + task.status.lock().unwrap().state = State::Initialized; + task + } + + fn init() { + let _ = env_logger::builder().is_test(true).try_init(); + let _ = std::fs::create_dir("test_files/"); + static ONCE: std::sync::Once = std::sync::Once::new(); + ONCE.call_once(|| { + info!("server start 127.0.0.1:7878"); + std::thread::spawn(|| { + let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); + for stream in listener.incoming() { + std::thread::sleep(std::time::Duration::from_secs(2)); + let stream = stream.unwrap(); + handle_connection(stream); + } + }); + }) + } + + fn handle_connection(mut stream: TcpStream) { + let buf_reader = BufReader::new(&mut stream); + let http_request: Vec<_> = buf_reader + .lines() + .map(|result| result.unwrap()) + .take_while(|line| !line.is_empty()) + .collect(); + debug!("http request: {:#?}", http_request); + let response = "HTTP/1.1 200 OK\r\n\r\n"; + stream.write_all(response.as_bytes()).unwrap(); + } + + #[test] + fn ut_upload_basic() { + init(); + let file_path = "test_files/ut_upload_basic.txt"; + + let file = File::options() + .read(true) + .write(true) + .create(true) + .open(file_path) + .unwrap(); + file.set_len(100000).unwrap(); + + let config = ConfigBuilder::new() + .action(Action::Upload) + .method("POST") + .mode(Mode::BackGround) + .file_spec(file) + .url("http://127.0.0.1:7878/") + .redirect(true) + .build(); + let task = build_task(config); + + ylong_runtime::block_on(async { + upload_inner(task).await.unwrap(); + }) } } diff --git a/services/src/utils/c_wrapper.rs b/services/src/utils/c_wrapper.rs index 06c1532c..201395a2 100644 --- a/services/src/utils/c_wrapper.rs +++ b/services/src/utils/c_wrapper.rs @@ -40,12 +40,18 @@ impl From<&String> for CStringWrapper { impl ToString for CStringWrapper { fn to_string(&self) -> String { if self.c_str.is_null() || self.len == 0 { - unsafe { DeleteChar(self.c_str) }; + #[cfg(feature = "oh")] + unsafe { + DeleteChar(self.c_str) + }; return String::new(); } let bytes = unsafe { slice::from_raw_parts(self.c_str as *const u8, self.len as usize) }; let str = unsafe { String::from_utf8_unchecked(bytes.to_vec()) }; - unsafe { DeleteChar(self.c_str) }; + #[cfg(feature = "oh")] + unsafe { + DeleteChar(self.c_str) + }; str } } @@ -104,7 +110,7 @@ impl FormItem { } } -#[link(name = "download_server_cxx", kind = "static")] +#[cfg(feature = "oh")] extern "C" { pub(crate) fn DeleteChar(ptr: *const c_char); pub(crate) fn DeleteCFormItem(ptr: *const CFormItem); diff --git a/services/src/utils/form_item.rs b/services/src/utils/form_item.rs index e23daa4f..d5d889d1 100644 --- a/services/src/utils/form_item.rs +++ b/services/src/utils/form_item.rs @@ -12,7 +12,7 @@ // limitations under the License. use std::fs::File; -use std::os::fd::{AsRawFd, RawFd}; +use std::os::fd::{IntoRawFd, RawFd}; /// File Spec #[derive(Clone, Debug)] @@ -33,14 +33,14 @@ pub struct FileSpec { impl FileSpec { /// Create a new file spec with user file. - pub fn user_file(f: &File) -> Self { + pub fn user_file(file: File) -> Self { Self { name: "".to_string(), path: "".to_string(), file_name: "".to_string(), mime_type: "".to_string(), is_user_file: true, - fd: Some(f.as_raw_fd()), + fd: Some(file.into_raw_fd()), } } } diff --git a/services/src/utils/mod.rs b/services/src/utils/mod.rs index dbef44f4..d400e01a 100644 --- a/services/src/utils/mod.rs +++ b/services/src/utils/mod.rs @@ -13,14 +13,17 @@ pub(crate) mod c_wrapper; pub(crate) mod form_item; -pub(crate) mod task_id_generator; -pub(crate) mod url_policy; use std::collections::HashMap; use std::future::Future; use std::io::Write; use std::time::{SystemTime, UNIX_EPOCH}; -pub(crate) use ffi::RequestTaskMsg; +cfg_oh! { + pub(crate) use ffi::RequestTaskMsg; + pub(crate) mod url_policy; + pub(crate) mod task_id_generator; +} + use ylong_runtime::sync::oneshot::Receiver; use ylong_runtime::task::JoinHandle; @@ -96,6 +99,7 @@ pub(crate) fn runtime_spawn + Send + Sync + 'static>( )) } +#[cfg(feature = "oh")] pub(crate) fn query_app_state(uid: u64) -> ApplicationState { let top_uid = query_top_uid(); match top_uid { @@ -110,6 +114,7 @@ pub(crate) fn query_app_state(uid: u64) -> ApplicationState { } } +#[cfg(feature = "oh")] fn query_top_uid() -> Option { let mut uid = 0; for i in 0..10 { @@ -126,21 +131,25 @@ fn query_top_uid() -> Option { None } +#[cfg(feature = "oh")] pub(crate) fn query_calling_bundle() -> String { let token_id = ipc::Skeleton::calling_full_token_id(); ffi::GetCallingBundle(token_id) } +#[cfg(feature = "oh")] pub(crate) fn is_system_api() -> bool { let token_id = ipc::Skeleton::calling_full_token_id(); ffi::IsSystemAPI(token_id) } +#[cfg(feature = "oh")] pub(crate) fn check_permission(permission: &str) -> bool { let token_id = ipc::Skeleton::calling_full_token_id(); ffi::CheckPermission(token_id, permission) } +#[cfg(feature = "oh")] pub(crate) fn publish_state_change_event( bundle_name: &str, task_id: u32, @@ -152,6 +161,7 @@ pub(crate) fn publish_state_change_event( } } +#[cfg(feature = "oh")] pub(crate) fn request_background_notify( msg: RequestTaskMsg, wrapped_path: &str, @@ -163,17 +173,14 @@ pub(crate) fn request_background_notify( code => Err(code), } } - #[cxx::bridge(namespace = "OHOS::Request")] mod ffi { - pub(crate) struct RequestTaskMsg { + struct RequestTaskMsg { pub(crate) task_id: u32, pub(crate) uid: i32, pub(crate) action: u8, } - unsafe extern "C++" {} - unsafe extern "C++" { include!("request_utils.h"); @@ -193,6 +200,7 @@ mod ffi { } } +#[cfg(feature = "oh")] #[cfg(test)] mod test { use super::*; diff --git a/services/src/utils/url_policy.rs b/services/src/utils/url_policy.rs index 7dadef89..b942fece 100644 --- a/services/src/utils/url_policy.rs +++ b/services/src/utils/url_policy.rs @@ -21,7 +21,6 @@ pub(crate) fn check_url_domain(app_id: &str, domain_type: &str, url: &str) -> Op } } -#[link(name = "download_server_cxx", kind = "static")] extern "C" { pub(crate) fn PolicyCheckUrlDomain( app_id: CStringWrapper, diff --git a/services/tests/BUILD.gn b/services/tests/BUILD.gn index ee3eaf9f..1363a648 100644 --- a/services/tests/BUILD.gn +++ b/services/tests/BUILD.gn @@ -44,7 +44,10 @@ ohos_rust_unittest("rust_request_ut_test") { ohos_rust_unittest("rust_request_sdv_test") { module_out_path = "request/request_rust" - rustflags = [ "--cfg=gn_test" ] + rustflags = [ + "--cfg=gn_test", + "--cfg=feature=\"oh\"", + ] sources = [ "entry.rs" ] diff --git a/services/tests/construct.rs b/services/tests/construct.rs index e4a0d6f2..c6b4393b 100644 --- a/services/tests/construct.rs +++ b/services/tests/construct.rs @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![cfg(feature = "oh")] use download_server::config::ConfigBuilder; use test_common::test_init; #[test] diff --git a/services/tests/entry.rs b/services/tests/entry.rs index 9f06852e..eb37507e 100644 --- a/services/tests/entry.rs +++ b/services/tests/entry.rs @@ -11,8 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![cfg(gn_test)] - mod construct; +mod resume; mod search; mod start; diff --git a/services/tests/resume.rs b/services/tests/resume.rs new file mode 100644 index 00000000..d7abc717 --- /dev/null +++ b/services/tests/resume.rs @@ -0,0 +1,53 @@ +// Copyright (C) 2024 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#![cfg(feature = "oh")] +use std::fs::File; +use std::time::Duration; + +use download_server::config::{Action, ConfigBuilder, Mode}; +use test_common::test_init; + +#[test] +fn sdv_start_resume() { + let file_path = "sdv_start_resume.txt"; + + let agent = test_init(); + let file = File::create(file_path).unwrap(); + let config = ConfigBuilder::new() + .action(Action::Download) + .mode(Mode::BackGround) + .file_spec(file) + .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt") + .redirect(true) + .build(); + let task_id = agent.construct(config); + agent.start(task_id); + agent.subscribe(task_id); + agent.pause(task_id); + std::thread::sleep(std::time::Duration::from_secs(1)); + agent.resume(task_id); + ylong_runtime::block_on(async { + 'main: loop { + let messages = agent.pop_task_info(task_id); + for message in messages { + message.check_correct(); + if message.is_finished() { + break 'main; + } + } + ylong_runtime::time::sleep(Duration::from_secs(1)).await; + } + let file = File::open(file_path).unwrap(); + assert_eq!(1042003, file.metadata().unwrap().len()); + }) +} diff --git a/services/tests/search.rs b/services/tests/search.rs index b873c5a0..3b698703 100644 --- a/services/tests/search.rs +++ b/services/tests/search.rs @@ -10,7 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - +#![cfg(feature = "oh")] use std::time::{SystemTime, UNIX_EPOCH}; use download_server::config::{Action, ConfigBuilder, Mode}; diff --git a/services/tests/start.rs b/services/tests/start.rs index 0299f6d0..3ae8b331 100644 --- a/services/tests/start.rs +++ b/services/tests/start.rs @@ -10,22 +10,23 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - +#![cfg(feature = "oh")] use std::fs::File; use std::time::Duration; use download_server::config::{Action, ConfigBuilder, Mode}; -use download_server::FileSpec; use test_common::test_init; #[test] fn sdv_start_basic() { + let file_path = "sdv_network_resume.txt"; + let agent = test_init(); - let file = File::create("sdv_network_resume.txt").unwrap(); + let file = File::create(file_path).unwrap(); let config = ConfigBuilder::new() .action(Action::Download) .mode(Mode::BackGround) - .file_spec(|v| v.push(FileSpec::user_file(&file))) + .file_spec(file) .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt") .redirect(true) .build(); @@ -43,6 +44,7 @@ fn sdv_start_basic() { } ylong_runtime::time::sleep(Duration::from_secs(1)).await; } + let file = File::open(file_path).unwrap(); assert_eq!(1042003, file.metadata().unwrap().len()); }) } diff --git a/test/rustest/BUILD.gn b/test/rustest/BUILD.gn index 50dc1c3c..5714dd3b 100644 --- a/test/rustest/BUILD.gn +++ b/test/rustest/BUILD.gn @@ -21,7 +21,7 @@ ohos_rust_static_library("rust_request_test_common") { "./c:request_test", "//third_party/rust/crates/once_cell:lib", ] - + features = [ "oh" ] external_deps = [ "ipc:ipc_rust", "safwk:system_ability_fwk_rust", diff --git a/test/rustest/Cargo.toml b/test/rustest/Cargo.toml index 71711cd4..0d1669ca 100644 --- a/test/rustest/Cargo.toml +++ b/test/rustest/Cargo.toml @@ -16,11 +16,20 @@ name = "test_common" version = "0.1.0" edition = "2021" -[dependencies] +[features] +default = [] +oh = [ + "samgr", + "ipc", + "system_ability_fwk", + "download_server", +] + +[dependencies] ylong_runtime = { git = "https://gitee.com/openharmony/commonlibrary_rust_ylong_runtime", features = ["full"] } -download_server = { path = "../../services/" } -system_ability_fwk = { git = "https://gitee.com/openharmony/systemabilitymgr_safwk" } -samgr = { git = "https://gitee.com/openharmony/systemabilitymgr_samgr" } -ipc = { git = "https://gitee.com/openharmony/communication_ipc" } +download_server = { path = "../../services/", optional = true } +system_ability_fwk = { git = "https://gitee.com/openharmony/systemabilitymgr_safwk", optional = true } +samgr = { git = "https://gitee.com/openharmony/systemabilitymgr_samgr", optional = true } +ipc = { git = "https://gitee.com/openharmony/communication_ipc", optional = true } once_cell = "1.17.0" diff --git a/test/rustest/src/lib.rs b/test/rustest/src/lib.rs index 0c3a82ca..1aee9a4b 100644 --- a/test/rustest/src/lib.rs +++ b/test/rustest/src/lib.rs @@ -25,6 +25,7 @@ // limitations under the License. #![allow(unused)] #![allow(missing_docs)] +#![cfg(feature = "oh")] use std::collections::HashMap; use std::ffi::{c_char, CString}; @@ -122,6 +123,31 @@ impl RequestAgent { assert_eq!(ret, 0); } + pub fn pause(&self, task_id: u32) { + let mut data = MsgParcel::new(); + data.write_interface_token(SERVICE_TOKEN).unwrap(); + data.write(&0u32); + data.write(&format!("{}", task_id)).unwrap(); + let mut reply = self + .remote + .send_request(interface::PAUSE, &mut data) + .unwrap(); + let ret: i32 = reply.read().unwrap(); + assert_eq!(ret, 0); + } + + pub fn resume(&self, task_id: u32) { + let mut data = MsgParcel::new(); + data.write_interface_token(SERVICE_TOKEN).unwrap(); + data.write(&format!("{}", task_id)).unwrap(); + let mut reply = self + .remote + .send_request(interface::RESUME, &mut data) + .unwrap(); + let ret: i32 = reply.read().unwrap(); + assert_eq!(ret, 0); + } + pub fn search( &self, before: i64,