task 处理逻辑调整,添加测试, 增加no_oh 下测试

Signed-off-by: fqwert <yanglv2@huawei.com>
Change-Id: I8b45d616a55bdaa90046f7a079e81ff56a442980
Signed-off-by: fqwert <yanglv2@huawei.com>
This commit is contained in:
fqwert 2024-07-22 20:34:16 +08:00
parent 91d2306988
commit 0fb8e94302
40 changed files with 1513 additions and 837 deletions

View File

@ -18,9 +18,16 @@ edition = "2021"
license = "Apache-2.0"
[features]
default = []
default = ["oh"]
oh = []
oh = [
"hilog_rust",
"hisysevent",
"hitrace_meter_rust",
"samgr",
"ipc",
"system_ability_fwk",
]
[dependencies]
ylong_runtime = { git = "https://gitee.com/openharmony/commonlibrary_rust_ylong_runtime", features = ["full"] }
@ -31,14 +38,18 @@ ylong_http_client = { git = "https://gitee.com/openharmony/commonlibrary_rust_yl
"ylong_base",
] }
hilog_rust = { git = "https://gitee.com/openharmony/hiviewdfx_hilog" }
hisysevent = { git = "https://gitee.com/openharmony/hiviewdfx_hisysevent" }
hitrace_meter_rust = { git = "https://gitee.com/openharmony/hiviewdfx_hitrace" }
hilog_rust = { git = "https://gitee.com/openharmony/hiviewdfx_hilog", optional = true }
hisysevent = { git = "https://gitee.com/openharmony/hiviewdfx_hisysevent", optional = true }
hitrace_meter_rust = { git = "https://gitee.com/openharmony/hiviewdfx_hitrace", optional = true }
cxx = "1.0.115"
system_ability_fwk = { git = "https://gitee.com/openharmony/systemabilitymgr_safwk" }
samgr = { git = "https://gitee.com/openharmony/systemabilitymgr_samgr" }
ipc = { git = "https://gitee.com/openharmony/communication_ipc" }
system_ability_fwk = { git = "https://gitee.com/openharmony/systemabilitymgr_safwk", optional = true }
samgr = { git = "https://gitee.com/openharmony/systemabilitymgr_samgr", optional = true }
ipc = { git = "https://gitee.com/openharmony/communication_ipc", optional = true }
log = "0.4.22"
env_logger = "0.11.3"
mockall = { version = "0.12.1", features = ["nightly"] }
mockall_double = "0.3.1"
[dev-dependencies]
test_common = { path = "../test/rustest" }
test_common = { path = "../test/rustest", features = ["oh"] }

View File

@ -34,11 +34,14 @@ public:
rust::fn<void(const NetworkTaskManagerTx &task_manager)> notifyTaskManagerOnline,
rust::fn<void(const NetworkTaskManagerTx &task_manager)> notifyTaskManagerOffline);
~RequestNetCallbackStub();
int32_t NetAvailable(sptr<NetHandle> &netHandle) override;
int32_t NetLost(sptr<NetHandle> &netHandle) override;
int32_t NetUnavailable() override;
int32_t NetCapabilitiesChange(sptr<NetHandle> &netHandle, const sptr<NetAllCapabilities> &netAllCap) override;
private:
void HandleNetCap(const sptr<NetAllCapabilities> &netAllCap);
bool IsRoaming();
NetworkInner *networkNotifier_;
NetworkTaskManagerTx *task_manager_;

View File

@ -16,10 +16,13 @@
#include "c_request_database.h"
#include <securec.h>
#include <stdint.h>
#include <algorithm>
#include <cstdint>
#include <iostream>
#include <string>
#include <vector>
#include "base/request/request/common/include/log.h"
#include "cxx.h"

View File

@ -24,6 +24,7 @@
#include "net_conn_callback_stub.h"
#include "net_conn_client.h"
#include "net_specifier.h"
#include "refbase.h"
#ifdef REQUEST_TELEPHONY_CORE_SERVICE
#include "cellular_data_client.h"
@ -59,6 +60,58 @@ RequestNetCallbackStub::~RequestNetCallbackStub()
rust::Box<NetworkTaskManagerTx>::from_raw(task_manager_);
}
void RequestNetCallbackStub::HandleNetCap(const sptr<NetAllCapabilities> &netAllCap)
{
if (netAllCap->netCaps_.find(NetCap::NET_CAPABILITY_VALIDATED) == netAllCap->netCaps_.end()) {
networkNotifier_->notify_offline();
notifyTaskManagerOffline_(*task_manager_);
return;
}
for (auto bearerType : netAllCap->bearerTypes_) {
auto networkInfo = NetworkInfo();
if (bearerType == NetManagerStandard::NetBearType::BEARER_WIFI) {
networkInfo.network_type = NetworkType::Wifi;
networkInfo.is_metered = false;
networkInfo.is_roaming = false;
if (networkNotifier_->notify_online(networkInfo)) {
notifyTaskManagerOnline_(*task_manager_);
}
return;
} else if (bearerType == NetManagerStandard::NetBearType::BEARER_CELLULAR) {
networkInfo.network_type = NetworkType::Cellular;
networkInfo.is_metered = true;
networkInfo.is_roaming = this->IsRoaming();
if (networkNotifier_->notify_online(networkInfo)) {
notifyTaskManagerOnline_(*task_manager_);
}
return;
};
}
if (networkNotifier_->notify_online(NetworkInfo{
.network_type = NetworkType::Other,
.is_metered = false,
.is_roaming = false,
})) {
notifyTaskManagerOnline_(*task_manager_);
}
return;
}
int32_t RequestNetCallbackStub::NetAvailable(sptr<NetHandle> &netHandle)
{
sptr<NetAllCapabilities> netAllCap = sptr<NetAllCapabilities>::MakeSptr();
int32_t ret = NetConnClient::GetInstance().GetNetCapabilities(*netHandle, *netAllCap);
if (ret != 0) {
REQUEST_HILOGE("GetNetCapabilities failed, ret = %{public}d", ret);
return ret;
}
this->HandleNetCap(netAllCap);
return 0;
}
int32_t RequestNetCallbackStub::NetLost(sptr<NetHandle> &netHandle)
{
networkNotifier_->notify_offline();
@ -77,39 +130,7 @@ int32_t RequestNetCallbackStub::NetCapabilitiesChange(
sptr<NetHandle> &netHandle, const sptr<NetAllCapabilities> &netAllCap)
{
REQUEST_HILOGI("NetCapabilitiesChange");
if (netAllCap->netCaps_.find(NetCap::NET_CAPABILITY_VALIDATED) == netAllCap->netCaps_.end()) {
networkNotifier_->notify_offline();
return 0;
}
for (auto bearerType : netAllCap->bearerTypes_) {
if (bearerType == NetManagerStandard::NetBearType::BEARER_WIFI) {
if (networkNotifier_->notify_online(NetworkInfo{
.network_type = NetworkType::Wifi,
.is_metered = false,
.is_roaming = false,
})) {
notifyTaskManagerOnline_(*task_manager_);
}
return 0;
} else if (bearerType == NetManagerStandard::NetBearType::BEARER_CELLULAR) {
if (networkNotifier_->notify_online(NetworkInfo{
.network_type = NetworkType::Cellular,
.is_metered = true,
.is_roaming = this->IsRoaming(),
})) {
notifyTaskManagerOnline_(*task_manager_);
}
return 0;
};
}
if (networkNotifier_->notify_online(NetworkInfo{
.network_type = NetworkType::Other,
.is_metered = false,
.is_roaming = false,
})) {
notifyTaskManagerOnline_(*task_manager_);
}
this->HandleNetCap(netAllCap);
return 0;
}

View File

@ -22,24 +22,32 @@
clippy::enum_variant_names,
clippy::clone_on_copy
)]
#[macro_use]
mod hilog;
pub mod ability;
#[macro_use]
mod macros;
#[cfg(not(feature = "oh"))]
#[macro_use]
extern crate log;
cfg_oh! {
#[macro_use]
mod hilog;
mod trace;
mod service;
pub mod ability;
mod sys_event;
pub use service::interface;
pub use utils::form_item::FileSpec;
}
mod error;
mod manage;
mod service;
mod sys_event;
mod task;
#[cfg(feature = "oh")]
mod trace;
mod utils;
pub use service::interface;
pub use task::{config, info};
pub use utils::form_item::FileSpec;
cfg_oh! {
#[cfg(not(test))]
const LOG_LABEL: hilog_rust::HiLogLabel = hilog_rust::HiLogLabel {
log_type: hilog_rust::LogType::LogCore,
@ -68,3 +76,4 @@ mod tests {
fn SetAccessTokenPermission();
}
}
}

44
services/src/macros.rs Normal file
View File

@ -0,0 +1,44 @@
// Copyright (C) 2023 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
macro_rules! cfg_oh {
($($item:item)*) => {
$(
#[cfg(feature = "oh")]
$item
)*
}
}
macro_rules! cfg_not_oh {
($($item:item)*) => {
$(
#[cfg(not(feature = "oh"))]
$item
)*
}
}
#[cfg(not(feature = "oh"))]
macro_rules! cvt_res_error {
($res: expr, $($args:tt)*) => {{
match $res {
Ok(value) => value,
Err(e) => {
error!($($args)*);
error!("Error msg: {:?}", e);
return Err(e);
}
}
}}
}

View File

@ -295,7 +295,6 @@ impl RequestDb {
#[cxx::bridge(namespace = "OHOS::Request")]
mod ffi {
#[repr(i32)]
enum OS_ACCOUNT_SUBSCRIBE_TYPE {
INVALID_TYPE = -1,

View File

@ -75,7 +75,7 @@ extern "C" fn process_state_change_callback(uid: i32, state: i32, pid: i32) {
}
}
#[link(name = "download_server_cxx", kind = "static")]
#[cfg(feature = "oh")]
extern "C" {
fn RegisterAPPStateCallback(f: extern "C" fn(i32, i32, i32));
fn RegisterProcessStateCallback(f: extern "C" fn(i32, i32, i32));

View File

@ -96,7 +96,7 @@ fn update_system_cert(info: &Arc<RwLock<CertInfo>>) {
};
}
#[link(name = "download_server_cxx", kind = "static")]
#[cfg(feature = "oh")]
extern "C" {
pub(crate) fn GetUserCertsData() -> *const CRequestCerts;
pub(crate) fn FreeCertDataList(certs: *const CRequestCerts);

View File

@ -37,7 +37,7 @@ impl SystemProxyManager {
}
}
#[link(name = "download_server_cxx", kind = "static")]
#[cfg(feature = "oh")]
extern "C" {
pub(crate) fn RegisterProxySubscriber();
pub(crate) fn GetHost() -> CStringWrapper;

View File

@ -267,7 +267,8 @@ pub(crate) struct TaskQosInfo {
pub(crate) priority: u32,
}
#[link(name = "download_server_cxx", kind = "static")]
#[cfg(feature = "oh")]
extern "C" {
fn DeleteCTaskConfig(ptr: *const CTaskConfig);
fn DeleteCTaskInfo(ptr: *const CTaskInfo);
@ -415,6 +416,7 @@ mod test {
use crate::config::Mode;
use crate::task::info::State;
use crate::tests::{test_init, DB_LOCK};
use crate::utils::get_current_timestamp;
use crate::utils::task_id_generator::TaskIdGenerator;
#[test]
@ -473,8 +475,9 @@ mod test {
test_init();
let _lock = DB_LOCK.lock().unwrap();
let uid = 123456789;
let uid = get_current_timestamp() as u64;
let mut db = RequestDb::get_instance();
db.execute(&format!(
"INSERT INTO request_task (task_id, uid, state, mode) VALUES ({}, {}, {}, {})",
TaskIdGenerator::generate(),

View File

@ -21,7 +21,7 @@ impl TaskManager {
match Database::get_instance().get_task_info(task_id) {
Some(info) if info.uid() == uid => {
debug!("TaskManager Show: task info is {:?}", info);
info!("TaskManager Show: task info is {:?}", info);
Some(info)
}
_ => {

View File

@ -11,15 +11,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod account;
pub(crate) mod app_state;
pub(crate) mod config;
pub(crate) mod database;
pub(crate) mod events;
pub(crate) mod network;
pub(crate) mod notifier;
pub(crate) mod scheduler;
pub(crate) mod task_manager;
pub(crate) use config::{SystemConfig, SystemConfigManager};
cfg_oh! {
pub(crate) mod account;
pub(crate) mod app_state;
pub(crate) mod config;
pub(crate) mod database;
pub(crate) mod events;
pub(crate) mod notifier;
pub(crate) mod scheduler;
pub(crate) mod task_manager;
pub(crate) use config::{SystemConfig, SystemConfigManager};
pub(crate) use task_manager::TaskManager;
}
mod network;
pub(crate) use network::Network;
pub(crate) use task_manager::TaskManager;

View File

@ -10,16 +10,23 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::sync::{Arc, RwLock};
use cxx::UniquePtr;
use ffi::NetworkRegistry;
pub(crate) use ffi::{NetworkInfo, NetworkType};
use NetworkState::{Offline, Online};
use super::database::RequestDb;
use super::events::TaskManagerEvent;
use super::task_manager::TaskManagerTx;
cfg_oh! {
use super::database::RequestDb;
use super::events::TaskManagerEvent;
use super::task_manager::TaskManagerTx;
}
cfg_not_oh! {
use mockall::automock;
}
use crate::task::config::{NetworkConfig, TaskConfig};
use crate::task::info::State;
use crate::task::reason::Reason;
@ -28,6 +35,7 @@ use crate::utils::get_current_timestamp;
#[derive(Clone)]
pub struct Network {
inner: NetworkInner,
#[cfg(feature = "oh")]
_registry: Arc<UniquePtr<NetworkRegistry>>,
}
@ -38,16 +46,25 @@ pub(crate) enum NetworkState {
}
impl Network {
#![allow(unused)]
pub(crate) fn new() -> Self {
Self {
inner: NetworkInner::new(),
#[cfg(feature = "oh")]
_registry: Arc::new(UniquePtr::null()),
}
}
pub(crate) fn is_online(&self) -> bool {
matches!(*self.inner.state.read().unwrap(), Online(_))
}
pub(crate) fn state(&self) -> RwLockReadGuard<NetworkState> {
self.inner.state.read().unwrap()
pub(crate) fn state(&self) -> NetworkState {
self.inner.state.read().unwrap().clone()
}
pub(crate) fn satisfied_state(&self, config: &TaskConfig) -> bool {
match &*self.state() {
match self.state() {
// Handles in `RequestTask::network_online`.
NetworkState::Offline => true,
NetworkState::Online(info) => {
@ -87,8 +104,11 @@ impl Network {
}
}
pub(crate) fn register_network_change(task_manager: TaskManagerTx) -> Network {
pub(crate) fn register_network_change(
#[cfg(feature = "oh")] task_manager: TaskManagerTx,
) -> Network {
let inner = NetworkInner::new();
#[cfg(feature = "oh")]
let registry = ffi::RegisterNetworkChange(
Box::new(inner.clone()),
Box::new(NetworkTaskManagerTx {
@ -105,6 +125,7 @@ pub(crate) fn register_network_change(task_manager: TaskManagerTx) -> Network {
.send_event(TaskManagerEvent::network_offline());
},
);
#[cfg(feature = "oh")]
if registry.is_null() {
error!("RegisterNetworkChange failed sleep 1s and retry");
#[cfg(not(test))]
@ -115,6 +136,7 @@ pub(crate) fn register_network_change(task_manager: TaskManagerTx) -> Network {
}
Network {
inner,
#[cfg(feature = "oh")]
_registry: Arc::new(registry),
}
}
@ -126,6 +148,7 @@ pub struct NetworkInner {
}
pub struct NetworkTaskManagerTx {
#[cfg(feature = "oh")]
inner: TaskManagerTx,
}
@ -144,6 +167,8 @@ impl NetworkInner {
*self.last_notify_time.write().unwrap() = get_current_timestamp();
*state = Offline;
#[cfg(feature = "oh")]
self.update_database(NetworkState::Offline);
}
}
@ -158,6 +183,7 @@ impl NetworkInner {
*state = Online(info.clone());
#[cfg(feature = "oh")]
self.update_database(Online(info));
} else {
info!("Network change with the same: {:?}", info);
@ -165,6 +191,7 @@ impl NetworkInner {
ret
}
#[cfg(feature = "oh")]
fn update_database(&self, state: NetworkState) {
let mut database = RequestDb::get_instance();
match state {
@ -177,6 +204,7 @@ impl NetworkInner {
}
}
#[cfg(feature = "oh")]
impl RequestDb {
fn update_for_network_available(&mut self, info: &NetworkInfo) {
let mut sql = format!(
@ -208,8 +236,9 @@ impl RequestDb {
fn update_for_network_unavailable(&mut self, info: &NetworkInfo) {
let mut sql = format!(
"UPDATE request_task SET state = {}, reason = {} WHERE ((state = {} AND reason = {} ) OR state = {} OR state = {})",
"UPDATE request_task SET state = {}, retry = {}, reason = {} WHERE ((state = {} AND reason = {} ) OR state = {} OR state = {})",
State::Waiting.repr,
1,
Reason::UnsupportedNetworkType.repr,
State::Waiting.repr,
Reason::RunningTaskMeetLimits.repr,
@ -248,8 +277,9 @@ impl RequestDb {
fn update_for_network_offline(&mut self) {
let sql = format!(
"UPDATE request_task SET state = {}, reason = {} WHERE (state = {} AND reason = {} OR state = {} OR state = {})",
"UPDATE request_task SET state = {}, retry = {}, reason = {} WHERE (state = {} AND reason = {} OR state = {} OR state = {})",
State::Waiting.repr,
1,
Reason::UnsupportedNetworkType.repr,
State::Waiting.repr,
Reason::RunningTaskMeetLimits.repr,
@ -304,6 +334,7 @@ mod ffi {
}
}
#[cfg(feature = "oh")]
#[cfg(test)]
mod test {
@ -338,7 +369,7 @@ mod test {
assert!(network.is_online());
assert_eq!(
*network.state(),
network.state(),
Online(NetworkInfo {
network_type: NetworkType::Wifi,
is_metered: false,
@ -357,7 +388,7 @@ mod test {
assert!(network.is_online());
assert_eq!(
*network.state(),
network.state(),
Online(NetworkInfo {
network_type: NetworkType::Cellular,
is_metered: true,
@ -374,7 +405,7 @@ mod test {
let network = register_network_change(task_manager_tx);
assert!(network.is_online());
assert_eq!(
*network.state(),
network.state(),
Online(NetworkInfo {
network_type: NetworkType::Wifi,
is_metered: false,
@ -598,3 +629,74 @@ mod test {
assert!(v.contains(&task_id));
}
}
#[cfg(not(feature = "oh"))]
#[cfg(test)]
mod test {
use core::{net, time};
use std::fs::File;
use std::future::join;
use std::io::{Seek, SeekFrom, Write};
use std::sync::Arc;
use mockall_double::double;
use ylong_runtime::io::AsyncSeekExt;
use super::NetworkType;
use crate::config::{Action, ConfigBuilder, Mode, TaskConfig};
use crate::info::State;
use crate::manage::Network;
use crate::task::download;
use crate::task::download::download_inner;
use crate::task::reason::Reason;
use crate::task::request_task::{check_config, RequestTask, TaskError, TaskPhase};
use crate::utils::form_item::FileSpec;
const GITEE_FILE_LEN: u64 = 1042003;
const FS_FILE_LEN: u64 = 274619168;
fn build_task(config: TaskConfig, network: Network) -> Arc<RequestTask> {
let (files, client) = check_config(&config).unwrap();
let task = Arc::new(RequestTask::new(config, files, client, network));
task.status.lock().unwrap().state = State::Initialized;
task
}
fn init() {
let _ = env_logger::builder().is_test(true).try_init();
std::fs::create_dir("test_files/");
}
#[test]
fn ut_network_status_error() {
init();
let file_path = "test_files/ut_network_status_error.txt";
let file = File::create(file_path).unwrap();
let config = ConfigBuilder::new()
.action(Action::Download)
.mode(Mode::BackGround)
.file_spec(file)
.url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
.redirect(true)
.metered(false)
.retry(true)
.build();
let network = Network::new();
network.inner.notify_online(super::NetworkInfo {
network_type: NetworkType::Wifi,
is_metered: true,
is_roaming: false,
});
let task = build_task(config, network);
ylong_runtime::block_on(async {
let err = download_inner(task.clone()).await.unwrap_err();
assert_eq!(task.status.lock().unwrap().state, State::Waiting);
assert_eq!(
task.status.lock().unwrap().reason,
Reason::UnsupportedNetworkType
);
assert_eq!(err, TaskError::Waiting(TaskPhase::UserAbort));
});
}
}

View File

@ -21,6 +21,7 @@ use std::sync::Arc;
use keeper::SAKeeper;
pub(crate) use notify_task::NotifyTask;
use ylong_runtime::sync::oneshot;
use crate::ability::SYSTEM_CONFIG_MANAGER;
use crate::error::ErrorCode;
@ -52,6 +53,7 @@ pub(crate) struct RunningQueue {
runcount_manager: RunCountManagerEntry,
client_manager: ClientManagerEntry,
network: Network,
locks: HashMap<u32, oneshot::Receiver<()>>,
}
impl RunningQueue {
@ -80,6 +82,7 @@ impl RunningQueue {
runcount_manager,
client_manager,
network,
locks: HashMap::new(),
}
}
@ -169,6 +172,11 @@ impl RunningQueue {
// If the task is not in the current running queue, retrieve
// the corresponding task from the database and start it.
let system_config = unsafe { SYSTEM_CONFIG_MANAGER.assume_init_ref().system_config() };
if let Some(recv) = self.locks.remove(&task_id) {
let _ = recv.await;
}
let task = match Database::get_instance()
.get_task(
task_id,
@ -201,14 +209,15 @@ impl RunningQueue {
continue;
}
};
let (lock, rx) = oneshot::channel();
self.locks.insert(task_id, rx);
let keeper = self.keeper.clone();
let tx = self.tx.clone();
let runcount_manager = self.runcount_manager.clone();
task.speed_limit(qos_direction.direction() as u64);
new_queue.insert((uid, task_id), task.clone());
let task = RunningTask::new(runcount_manager, task, tx, keeper);
if !task.satisfied() {
let task = RunningTask::new(runcount_manager, task, tx, keeper, lock);
if !task.satisfied().await {
continue;
}
self.join_handles.push(runtime_spawn(async move {

View File

@ -11,24 +11,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::SeekFrom;
use std::ops::Deref;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use ylong_runtime::io::AsyncSeekExt;
use ylong_runtime::sync::oneshot;
use crate::manage::database::Database;
use crate::manage::events::{TaskEvent, TaskManagerEvent};
use crate::manage::notifier::Notifier;
use crate::manage::scheduler::queue::keeper::SAKeeper;
use crate::manage::scheduler::queue::notify_task::NotifyTask;
use crate::manage::task_manager::TaskManagerTx;
use crate::service::runcount::{RunCountEvent, RunCountManagerEntry};
use crate::task::config::Action;
use crate::task::download::download;
use crate::task::info::{State, UpdateInfo};
use crate::task::reason::Reason;
use crate::task::request_task::RequestTask;
use crate::task::upload::upload;
@ -36,6 +30,7 @@ pub(crate) struct RunningTask {
runcount_manager: RunCountManagerEntry,
task: NotifyTask,
tx: TaskManagerTx,
lock: Option<oneshot::Sender<()>>,
// `_keeper` is never used when executing the task.
_keeper: SAKeeper,
}
@ -46,183 +41,30 @@ impl RunningTask {
task: Arc<RequestTask>,
tx: TaskManagerTx,
keeper: SAKeeper,
lock: oneshot::Sender<()>,
) -> Self {
// Task start to run, then running count +1.
runcount_manager.send_event(RunCountEvent::change_runcount(1));
{
let mut task_status = task.status.lock().unwrap();
let from_state = task_status.state;
if from_state == State::Waiting
&& (task_status.reason == Reason::NetworkOffline
|| task_status.reason == Reason::UnsupportedNetworkType)
{
info!(
"Retry a waiting task with NetworkOffline/UnsupportedNetworkType,
uid:{}, task_id:{}",
task.conf.common_data.uid, task.conf.common_data.task_id
);
task.retry.store(true, Ordering::SeqCst);
task.tries.fetch_add(1, Ordering::SeqCst);
let mut progress = task.progress.lock().unwrap();
RequestTask::change_status(
&mut task_status,
&mut progress,
State::Retrying,
Reason::Default,
);
task.set_code(progress.common_data.index, Reason::Default, false);
task.resume.store(true, Ordering::SeqCst);
let codes_guard = task.code.lock().unwrap();
let update_info = UpdateInfo {
mtime: task_status.mtime,
reason: task_status.reason.repr,
progress: progress.clone(),
each_file_status: RequestTask::get_each_file_status_by_code(
&codes_guard,
&task.conf.file_specs,
),
tries: task.tries.load(Ordering::SeqCst),
mime_type: task.mime_type.lock().unwrap().clone(),
};
Database::get_instance().update_task(task.task_id(), update_info);
} else {
if from_state == State::Paused {
let notify_data = task.build_notify_data();
Notifier::resume(&task.client_manager, notify_data);
}
let mut progress = task.progress.lock().unwrap();
RequestTask::change_status(
&mut task_status,
&mut progress,
State::Running,
Reason::Default,
);
task.set_code(progress.common_data.index, Reason::Default, false);
if from_state.check_resume() {
task.resume.store(true, Ordering::SeqCst);
}
let codes_guard = task.code.lock().unwrap();
let update_info = UpdateInfo {
mtime: task_status.mtime,
reason: task_status.reason.repr,
progress: progress.clone(),
each_file_status: RequestTask::get_each_file_status_by_code(
&codes_guard,
&task.conf.file_specs,
),
tries: task.tries.load(Ordering::SeqCst),
mime_type: task.mime_type.lock().unwrap().clone(),
};
Database::get_instance().update_task(task.task_id(), update_info);
}
}
Self {
runcount_manager,
task: NotifyTask::new(task),
lock: Some(lock),
tx,
_keeper: keeper,
}
}
pub(crate) async fn run(&self) {
let task = self;
let mut index = 0;
info!("run task: {}", task.conf.common_data.task_id);
let action = task.conf.common_data.action;
pub(crate) async fn run(self) {
let action = self.conf.common_data.action;
match action {
Action::Download => loop {
task.set_code(0, Reason::Default, true);
download(task.task.clone()).await;
let mut task_status = task.status.lock().unwrap();
if !task_status.state.is_doing() {
break;
}
let mut progress = self.progress.lock().unwrap();
let codes_guard = task.code.lock().unwrap();
let reason = codes_guard.first();
match reason {
Some(reason) => {
if *reason != Reason::Default {
RequestTask::change_status(
&mut task_status,
&mut progress,
State::Failed,
*reason,
);
let update_info = UpdateInfo {
mtime: task_status.mtime,
reason: task_status.reason.repr,
progress: progress.clone(),
each_file_status: RequestTask::get_each_file_status_by_code(
&codes_guard,
&task.conf.file_specs,
),
tries: task.tries.load(Ordering::SeqCst),
mime_type: task.mime_type.lock().unwrap().clone(),
};
Database::get_instance().update_task(task.task_id(), update_info);
break;
}
}
None => break,
}
},
Action::Download => {
download(self.task.clone()).await;
}
Action::Upload => {
let state = task.status.lock().unwrap().state;
if state == State::Retrying {
index = {
let mut progress_guard = task.progress.lock().unwrap();
let index = progress_guard.common_data.index;
progress_guard.common_data.total_processed -=
progress_guard.processed[index];
progress_guard.processed[index] = 0;
index
};
let file = task.files.get_mut(index).unwrap();
let mut begins = task.conf.common_data.begins;
let (is_partial_upload, _) = task.get_upload_info(index);
if !is_partial_upload {
begins = 0;
}
if let Err(e) = file.seek(SeekFrom::Start(begins)).await {
task.set_code(index, Reason::IoError, false);
error!("seek err is {:?}", e);
}
}
upload(task.task.clone()).await;
upload(self.task.clone()).await;
}
_ => {}
}
info!(
"task run end: {}, state: {:?}, reason: {:?}",
task.conf.common_data.task_id,
task.status.lock().unwrap().state,
task.code.lock().unwrap()[index]
);
let (state, reason) = {
let status = self.task.status.lock().unwrap();
(status.state, status.reason)
};
// Only tasks that cannot run automatically need to be removed from QoS
if state == State::Waiting && reason == Reason::RunningTaskMeetLimits {
return;
}
// UserOperation tasks has been removed from qos in TaskManager
if reason == Reason::UserOperation {
return;
}
let _ = self
.tx
.send_event(TaskManagerEvent::Task(TaskEvent::Finished(
self.task_id(),
self.uid(),
)));
}
}
@ -236,6 +78,14 @@ impl Deref for RunningTask {
impl Drop for RunningTask {
fn drop(&mut self) {
info!("Task {} drop", self.task_id());
self.tx
.send_event(TaskManagerEvent::Task(TaskEvent::Finished(
self.task_id(),
self.uid(),
)));
self.lock.take().unwrap().send(()).unwrap();
// Task finishes running, then running count -1.
self.runcount_manager
.send_event(RunCountEvent::change_runcount(-1));

View File

@ -221,7 +221,6 @@ impl Client {
.await;
}
}
debug!("Client handle message done");
}
}

View File

@ -18,18 +18,20 @@ use ylong_http_client::{
Certificate, HttpClientError, Proxy, PubKeyPins, Redirect, Timeout, TlsVersion,
};
use crate::manage::SystemConfig;
cfg_oh! {
use crate::manage::SystemConfig;
use crate::utils::url_policy::check_url_domain;
}
use crate::task::config::{Action, TaskConfig};
use crate::task::files::{check_atomic_convert_path, convert_path};
use crate::task::ATOMIC_SERVICE;
use crate::utils::url_policy::check_url_domain;
const CONNECT_TIMEOUT: u64 = 60;
const SECONDS_IN_ONE_WEEK: u64 = 7 * 24 * 60 * 60;
pub(crate) fn build_client(
config: &TaskConfig,
mut system: SystemConfig,
#[cfg(feature = "oh")] mut system: SystemConfig,
) -> Result<Client, Box<dyn Error + Send + Sync>> {
let mut client = Client::builder()
.connect_timeout(Timeout::from_secs(CONNECT_TIMEOUT))
@ -45,6 +47,7 @@ pub(crate) fn build_client(
}
// Set HTTP proxy.
#[cfg(feature = "oh")]
if let Some(proxy) = build_task_proxy(config)? {
client = client.proxy(proxy);
} else if let Some(proxy) = build_system_proxy(&system)? {
@ -55,6 +58,7 @@ pub(crate) fn build_client(
// redirected to HTTPS.
// Set system certs.
#[cfg(feature = "oh")]
if let Some(certs) = system.certs.take() {
for cert in certs.into_iter() {
client = client.add_root_certificate(cert)
@ -79,6 +83,7 @@ pub(crate) fn build_client(
"ApiPolicy Domain check, task_id is {}, bundle is {}, domian_type is {}, url is {}",
config.common_data.task_id, &config.bundle, &domain_type, &config.url
);
#[cfg(feature = "oh")]
if let Some(is_accessed) = check_url_domain(&config.bundle, &domain_type, &config.url) {
if !is_accessed {
error!(
@ -96,8 +101,12 @@ pub(crate) fn build_client(
);
}
let interceptors = DomainInterceptor::new(config.bundle.clone(), domain_type);
client = client.interceptor(interceptors);
#[cfg(feature = "oh")]
{
let interceptors = DomainInterceptor::new(config.bundle.clone(), domain_type);
client = client.interceptor(interceptors);
}
info!(
"add interceptor domain check, tid: {}",
config.common_data.task_id
@ -138,6 +147,7 @@ fn build_task_certificate_pins(
)))
}
#[cfg(feature = "oh")]
fn build_system_proxy(
system: &SystemConfig,
) -> Result<Option<Proxy>, Box<dyn Error + Send + Sync>> {
@ -207,6 +217,7 @@ impl DomainInterceptor {
}
}
#[cfg(feature = "oh")]
impl Interceptor for DomainInterceptor {
/// Intercepts the redirect request.
fn intercept_redirect_request(&self, request: &Request) -> Result<(), HttpClientError> {

View File

@ -16,7 +16,10 @@ use std::fs::File;
use std::os::fd::FromRawFd;
pub use ffi::{Action, Mode};
use ipc::parcel::Serialize;
cfg_oh! {
use ipc::parcel::Serialize;
}
use crate::utils::c_wrapper::{CFileSpec, CFormItem, CStringWrapper};
use crate::utils::form_item::{FileSpec, FormItem};
@ -227,7 +230,7 @@ impl Default for TaskConfig {
token: "xxx".to_string(),
proxy: "".to_string(),
extras: Default::default(),
version: Version::API9,
version: Version::API10,
form_items: vec![],
file_specs: vec![],
body_file_paths: vec![],
@ -240,8 +243,8 @@ impl Default for TaskConfig {
action: Action::Download,
mode: Mode::BackGround,
cover: false,
network_config: NetworkConfig::Wifi,
metered: true,
network_config: NetworkConfig::Any,
metered: false,
roaming: false,
retry: false,
redirect: true,
@ -275,9 +278,15 @@ impl ConfigBuilder {
self
}
/// set version
pub fn version(&mut self, version: u8) -> &mut Self {
self.inner.version = version.into();
self
}
/// Set title
pub fn file_spec<F: Fn(&mut Vec<FileSpec>)>(&mut self, op: F) -> &mut Self {
op(&mut self.inner.file_specs);
pub fn file_spec(&mut self, file: File) -> &mut Self {
self.inner.file_specs.push(FileSpec::user_file(file));
self
}
/// Set action
@ -332,8 +341,33 @@ impl ConfigBuilder {
self.inner.common_data.redirect = redirect;
self
}
/// begins
pub fn begins(&mut self, begins: u64) -> &mut Self {
self.inner.common_data.begins = begins;
self
}
/// ends
pub fn ends(&mut self, ends: u64) -> &mut Self {
self.inner.common_data.ends = ends as i64;
self
}
/// method
pub fn method(&mut self, metered: &str) -> &mut Self {
self.inner.method = metered.to_string();
self
}
/// retry
pub fn retry(&mut self, retry: bool) -> &mut Self {
self.inner.common_data.retry = retry;
self
}
}
#[cfg(feature = "oh")]
impl Serialize for TaskConfig {
fn serialize(&self, parcel: &mut ipc::parcel::MsgParcel) -> ipc::IpcResult<()> {
parcel.write(&(self.common_data.action.repr as u32))?;

View File

@ -11,16 +11,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::SeekFrom;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::{Context, Poll};
use ylong_http_client::async_impl::{DownloadOperator, Downloader, Response};
use ylong_http_client::{HttpClientError, SpeedLimit, Timeout};
use ylong_http_client::{ErrorKind, HttpClientError, SpeedLimit, Timeout};
use ylong_runtime::io::AsyncSeekExt;
use super::operator::TaskOperator;
use super::reason::Reason;
use super::request_task::{TaskError, TaskPhase};
use crate::task::info::State;
use crate::task::request_task::RequestTask;
#[cfg(feature = "oh")]
@ -37,15 +40,6 @@ impl DownloadOperator for TaskOperator {
cx: &mut Context<'_>,
data: &[u8],
) -> Poll<Result<usize, HttpClientError>> {
if self.task.range_request.load(Ordering::SeqCst) {
if self.task.range_response.load(Ordering::SeqCst) {
return self.poll_write_file(cx, data, 0);
}
// write partial response data
let begins = self.task.conf.common_data.begins;
let ends = self.task.conf.common_data.ends;
return self.poll_write_partial_file(cx, data, begins, ends);
}
self.poll_write_file(cx, data, 0)
}
@ -74,86 +68,514 @@ pub(crate) fn build_downloader(
}
pub(crate) async fn download(task: Arc<RequestTask>) {
download_inner(task.clone()).await;
loop {
if let Err(e) = download_inner(task.clone()).await {
match e {
TaskError::Waiting(phase) => match phase {
TaskPhase::NeedRetry => {
continue;
}
TaskPhase::UserAbort => {
let state = task.status.lock().unwrap().state;
if state == State::Running {
error!("task {} state is running with user abort", task.task_id());
}
}
TaskPhase::NetworkOffline => {}
},
TaskError::Failed(reason) => {
task.change_task_status(State::Failed, reason);
}
}
}
break;
}
use hisysevent::{build_number_param, build_str_param};
#[cfg(feature = "oh")]
{
use hisysevent::{build_number_param, build_str_param};
use crate::sys_event::SysEvent;
// `unwrap` for propagating panics among threads.
use crate::sys_event::SysEvent;
let reason = *task
.code
.lock()
.unwrap()
.first()
.unwrap_or(&Reason::Default);
// If `Reason` is not `Default`a records this sys event.
let reason = *task
.code
.lock()
.unwrap()
.first()
.unwrap_or(&Reason::Default);
// If `Reason` is not `Default`a records this sys event.
if reason != Reason::Default {
SysEvent::task_fault()
.param(build_str_param!(crate::sys_event::TASKS_TYPE, "DOWNLOAD"))
.param(build_number_param!(crate::sys_event::TOTAL_FILE_NUM, 1))
.param(build_number_param!(crate::sys_event::FAIL_FILE_NUM, 1))
.param(build_number_param!(crate::sys_event::SUCCESS_FILE_NUM, 0))
.param(build_number_param!(
crate::sys_event::ERROR_INFO,
reason.repr as i32
))
.write();
if reason != Reason::Default {
SysEvent::task_fault()
.param(build_str_param!(crate::sys_event::TASKS_TYPE, "DOWNLOAD"))
.param(build_number_param!(crate::sys_event::TOTAL_FILE_NUM, 1))
.param(build_number_param!(crate::sys_event::FAIL_FILE_NUM, 1))
.param(build_number_param!(crate::sys_event::SUCCESS_FILE_NUM, 0))
.param(build_number_param!(
crate::sys_event::ERROR_INFO,
reason.repr as i32
))
.write();
}
}
}
async fn download_inner(task: Arc<RequestTask>) {
debug!(
"begin download task, tid: {}",
task.conf.common_data.task_id
);
impl RequestTask {
async fn prepare_download(&self) -> Result<(), TaskError> {
let file = self.files.get_mut(0).unwrap();
file.seek(SeekFrom::End(0));
let downloaded = file.metadata().await?.len() as usize;
let mut progress = self.progress.lock().unwrap();
progress.common_data.index = 0;
progress.common_data.total_processed = downloaded;
progress.common_data.state = State::Running.repr;
progress.processed = vec![downloaded];
progress.sizes = vec![-1];
Ok(())
}
}
pub(crate) async fn download_inner(task: Arc<RequestTask>) -> Result<(), TaskError> {
// Ensures `_trace` can only be freed when this function exits.
#[cfg(feature = "oh")]
let _trace = Trace::new("download file");
task.prepare_running();
task.prepare_download().await?;
let response = {
let request = match task.build_download_request().await {
Some(request) => request,
None => return,
};
info!("download task {} start running", task.task_id());
#[cfg(feature = "oh")]
{
let name = task.conf.file_specs[0].path.as_str();
let download = task.progress.lock().unwrap().processed[0];
task.range_response.store(false, Ordering::SeqCst);
task.range_request.store(false, Ordering::SeqCst);
let request = task.build_download_request().await?;
// Ensures `_trace` can only be freed when this function exits.
let _trace = Trace::new(&format!(
"download file name: {name} downloaded size: {download}"
));
let response = task.client.request(request).await;
match response.as_ref() {
Ok(response) => {
let status_code = response.status();
#[cfg(feature = "oh")]
task.notify_response(response);
info!(
"task {} get http response code {}",
task.conf.common_data.task_id, status_code,
);
if status_code.is_server_error()
|| (status_code.as_u16() != 408 && status_code.is_client_error())
|| status_code.is_redirection()
{
return Err(TaskError::Failed(Reason::ProtocolError));
}
if status_code.as_u16() == 408 {
if task.tries.load(Ordering::SeqCst) < 2 {
task.tries.fetch_add(1, Ordering::SeqCst);
info!("task {} server timeout", task.task_id());
return Err(TaskError::Waiting(TaskPhase::NeedRetry));
} else {
info!("task {} retry 3 times", task.task_id());
return Err(TaskError::Failed(Reason::ProtocolError));
}
}
if status_code.as_u16() == 200 {
if task.require_range() {
info!("task {} server not support range request", task.task_id());
return Err(TaskError::Failed(Reason::UnsupportedRangeRequest));
}
let file = task.files.get(0).unwrap();
let has_downloaded = file.metadata().await?.len() > 0;
if has_downloaded {
error!("task {} file not cleared", task.task_id());
task.clear_downloaded_file().await?;
}
}
}
Err(e) => {
error!("Task {} {:?}", task.task_id(), e);
task.client.request(request).await
match e.error_kind() {
ErrorKind::Timeout => return Err(TaskError::Failed(Reason::ContinuousTaskTimeout)),
ErrorKind::Request => return Err(TaskError::Failed(Reason::RequestError)),
ErrorKind::Redirect => return Err(TaskError::Failed(Reason::RedirectError)),
ErrorKind::Connect | ErrorKind::ConnectionUpgrade => {
if task.tries.load(Ordering::SeqCst) < 2 {
task.tries.fetch_add(1, Ordering::SeqCst);
return Err(TaskError::Waiting(TaskPhase::NeedRetry));
}
info!("task {} retry 3 times", task.task_id());
if e.is_dns_error() {
return Err(TaskError::Failed(Reason::Dns));
} else if e.is_tls_error() {
return Err(TaskError::Failed(Reason::Ssl));
} else {
return Err(TaskError::Failed(Reason::Tcp));
}
}
ErrorKind::BodyTransfer => return task.handle_body_transfer_error().await,
_ => {
if format!("{}", e).contains("No space left on device") {
return Err(TaskError::Failed(Reason::InsufficientSpace));
} else {
return Err(TaskError::Failed(Reason::OthersError));
}
}
};
}
};
task.record_response_header(&response);
if !task.handle_response_error(&response).await {
error!("response error");
return;
}
let response = response.unwrap();
if !task.get_file_info(&response) {
return;
{
let mut guard = task.progress.lock().unwrap();
guard.extras.clear();
for (k, v) in response.headers() {
if let Ok(value) = v.to_string() {
guard.extras.insert(k.to_string().to_lowercase(), value);
}
}
}
task.get_file_info(&response)?;
let mut downloader = build_downloader(task.clone(), response);
let result = downloader.download().await;
if !task.handle_download_error(&result).await {
error!("handle_download_error");
return;
if let Err(e) = downloader.download().await {
return task.handle_download_error(e).await;
}
let file = task.files.get_mut(0).unwrap();
file.sync_all().await?;
// Makes sure all the data has been written to the target file.
if let Some(file) = task.files.get(0) {
let _ = file.sync_all().await;
}
task.change_task_status(State::Completed, Reason::Default);
info!("task {} download success", task.task_id());
Ok(())
}
#[cfg(not(feature = "oh"))]
#[cfg(test)]
mod test {
use core::time;
use std::fs::File;
use std::io::{Seek, SeekFrom, Write};
use std::sync::Arc;
use ylong_runtime::io::AsyncSeekExt;
use crate::config::{Action, ConfigBuilder, Mode, TaskConfig};
use crate::info::State;
use crate::manage::Network;
use crate::task::download::{download_inner, TaskPhase};
use crate::task::reason::Reason;
use crate::task::request_task::{check_config, RequestTask, TaskError};
const GITEE_FILE_LEN: u64 = 1042003;
const FS_FILE_LEN: u64 = 274619168;
fn build_task(config: TaskConfig) -> Arc<RequestTask> {
let (files, client) = check_config(&config).unwrap();
let network = Network::new();
let task = Arc::new(RequestTask::new(config, files, client, network));
task.status.lock().unwrap().state = State::Initialized;
task
}
fn init() {
let _ = env_logger::builder().is_test(true).try_init();
let _ = std::fs::create_dir("test_files/");
}
#[test]
fn ut_download_basic() {
init();
let file_path = "test_files/ut_download_basic.txt";
let file = File::create(file_path).unwrap();
let config = ConfigBuilder::new()
.action(Action::Download)
.mode(Mode::BackGround)
.file_spec(file)
.url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
.redirect(true)
.build();
let task = build_task(config);
ylong_runtime::block_on(async {
download_inner(task).await.unwrap();
let file = File::open(file_path).unwrap();
assert_eq!(GITEE_FILE_LEN, file.metadata().unwrap().len());
});
}
#[test]
fn ut_download_resume() {
init();
let file_path = "test_files/ut_download_resume.txt";
let mut file = File::create(file_path).unwrap();
file.write(&[0; GITEE_FILE_LEN as usize - 10000]).unwrap();
let config = ConfigBuilder::new()
.action(Action::Download)
.mode(Mode::BackGround)
.file_spec(file)
.url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
.redirect(true)
.build();
let task = build_task(config);
ylong_runtime::block_on(async {
download_inner(task).await.unwrap();
let file = File::open(file_path).unwrap();
assert_eq!(GITEE_FILE_LEN, file.metadata().unwrap().len());
});
}
#[test]
fn ut_download_not_support_range() {
init();
let file_path = "test_files/ut_download_not_support_range.txt";
let file = File::create(file_path).unwrap();
let config = ConfigBuilder::new()
.action(Action::Download)
.mode(Mode::BackGround)
.file_spec(file)
.url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
.redirect(true)
.begins(5000)
.build();
let task = build_task(config);
ylong_runtime::block_on(async {
let res = download_inner(task).await.unwrap_err();
assert_eq!(res, TaskError::Failed(Reason::UnsupportedRangeRequest));
let file = File::open(file_path).unwrap();
assert_eq!(0, file.metadata().unwrap().len());
});
}
#[test]
fn ut_download_resume_not_support_range() {
init();
let file_path = "test_files/ut_download_resume_not_support_range.txt";
let file = File::create(file_path).unwrap();
let config = ConfigBuilder::new()
.action(Action::Download)
.mode(Mode::BackGround)
.file_spec(file)
.url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
.redirect(true)
.build();
let task = build_task(config);
ylong_runtime::block_on(async {
let clone_task = task.clone();
ylong_runtime::spawn(async move {
ylong_runtime::time::sleep(time::Duration::from_secs(2)).await;
clone_task.status.lock().unwrap().state = State::Waiting;
});
let err = download_inner(task.clone()).await.unwrap_err();
assert_eq!(err, TaskError::Waiting(TaskPhase::UserAbort));
let file = task.files.get_mut(0).unwrap();
file.set_len(10000).await.unwrap();
file.seek(SeekFrom::End(0));
download_inner(task.clone()).await.unwrap();
let file = File::open(file_path).unwrap();
assert_eq!(GITEE_FILE_LEN, file.metadata().unwrap().len());
});
}
#[test]
fn ut_download_not_support_range_resume() {
init();
let file_path = "test_files/ut_download_not_support_range_resume.txt";
let mut file = File::create(file_path).unwrap();
file.write(&[0; 1000]).unwrap();
let config = ConfigBuilder::new()
.action(Action::Download)
.mode(Mode::BackGround)
.file_spec(file)
.url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
.redirect(true)
.begins(5000)
.build();
let task = build_task(config);
ylong_runtime::block_on(async {
let res = download_inner(task).await.unwrap_err();
assert_eq!(res, TaskError::Failed(Reason::UnsupportedRangeRequest));
let file = File::open(file_path).unwrap();
assert_eq!(1000, file.metadata().unwrap().len());
});
}
#[test]
fn ut_download_range_0() {
init();
let file_path = "test_files/ut_download_range_0.txt";
let file = File::create(file_path).unwrap();
let config = ConfigBuilder::new()
.action(Action::Download)
.mode(Mode::BackGround)
.file_spec(file)
.url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe")
.redirect(true)
.begins(5000)
.ends(10000)
.build();
let task = build_task(config);
ylong_runtime::block_on(async {
download_inner(task).await.unwrap();
let file = File::open(file_path).unwrap();
assert_eq!(5001, file.metadata().unwrap().len());
});
}
#[test]
fn ut_download_range_1() {
init();
let file_path = "test_files/ut_download_range_1.txt";
let file = File::create(file_path).unwrap();
let config = ConfigBuilder::new()
.action(Action::Download)
.mode(Mode::BackGround)
.file_spec(file)
.url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe")
.redirect(true)
.begins(273619168)
.build();
let task = build_task(config);
ylong_runtime::block_on(async {
download_inner(task).await.unwrap();
let file = File::open(file_path).unwrap();
assert_eq!(FS_FILE_LEN - 273619168, file.metadata().unwrap().len());
});
}
#[test]
fn ut_download_range_resume_0() {
init();
let file_path = "test_files/ut_download_range_resume_0.txt";
let mut file = File::create(file_path).unwrap();
file.write(&[0; FS_FILE_LEN as usize - 10000]).unwrap();
let config = ConfigBuilder::new()
.action(Action::Download)
.mode(Mode::BackGround)
.file_spec(file)
.url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe")
.redirect(true)
.build();
let task = build_task(config);
ylong_runtime::block_on(async {
download_inner(task).await.unwrap();
let file = File::open(file_path).unwrap();
assert_eq!(FS_FILE_LEN, file.metadata().unwrap().len());
});
}
#[test]
fn ut_download_range_resume_1() {
init();
let file_path = "test_files/ut_download_range_resume_1.txt";
let file = File::create(file_path).unwrap();
file.set_len(FS_FILE_LEN - 10000).unwrap();
let config = ConfigBuilder::new()
.action(Action::Download)
.mode(Mode::BackGround)
.file_spec(file)
.url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe")
.redirect(true)
.build();
let task = build_task(config);
ylong_runtime::block_on(async {
let clone_task = task.clone();
ylong_runtime::spawn(async move {
ylong_runtime::time::sleep(time::Duration::from_secs(2)).await;
clone_task.status.lock().unwrap().state = State::Waiting;
});
let ret = download_inner(task.clone()).await.unwrap_err();
assert_eq!(ret, TaskError::Waiting(TaskPhase::UserAbort));
let file = File::open(file_path).unwrap();
assert!(file.metadata().unwrap().len() < FS_FILE_LEN - 20000);
download_inner(task.clone()).await.unwrap();
assert_eq!(file.metadata().unwrap().len(), FS_FILE_LEN);
});
}
#[test]
fn ut_download_invalid_task() {
init();
let file_path = "test_files/ut_download_basic.txt";
let file = File::create(file_path).unwrap();
let config = ConfigBuilder::new()
.action(Action::Download)
.mode(Mode::BackGround)
.file_spec(file)
.url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
.redirect(true)
.build();
let task = build_task(config);
{
let mut progress = task.progress.lock().unwrap();
progress.sizes = vec![0];
progress.processed = vec![];
progress.common_data.index = 23;
progress.common_data.state = State::Failed.repr;
progress.common_data.total_processed = 321223;
}
ylong_runtime::block_on(async {
download_inner(task.clone()).await.unwrap();
let file = File::open(file_path).unwrap();
assert_eq!(GITEE_FILE_LEN, file.metadata().unwrap().len());
assert_eq!(State::Completed, task.status.lock().unwrap().state);
assert_eq!(0, task.progress.lock().unwrap().common_data.index);
assert_eq!(
GITEE_FILE_LEN,
task.progress.lock().unwrap().common_data.total_processed as u64
);
assert_eq!(
GITEE_FILE_LEN,
task.progress.lock().unwrap().processed[0] as u64
);
assert_eq!(
GITEE_FILE_LEN,
task.progress.lock().unwrap().sizes[0] as u64
);
});
}
/// For xts SUB_REQUEST_CROSSPLATFORM_DOWNDLOAD_API_TASKINFO_0002,
/// downloadTotalBytes to be -1
#[test]
fn ut_download_sizes() {
init();
let file_path = "test_files/ut_download_basic.txt";
let file = File::create(file_path).unwrap();
let config = ConfigBuilder::new()
.action(Action::Download)
.mode(Mode::BackGround)
.file_spec(file)
.url("https://gitee.com/chenzhixue/downloadTest/releases/download/v1.0/test_not_exists.apk")
.redirect(true)
.build();
let task = build_task(config);
{
let mut progress = task.progress.lock().unwrap();
progress.sizes = vec![0, 1, 2, 3];
progress.processed = vec![];
progress.common_data.index = 23;
progress.common_data.state = State::Failed.repr;
progress.common_data.total_processed = 321223;
}
ylong_runtime::block_on(async {
let err = download_inner(task.clone()).await.unwrap_err();
assert_eq!(err, TaskError::Failed(Reason::ProtocolError));
let sizes = task.progress.lock().unwrap().sizes.clone();
assert_eq!(sizes, vec![-1]);
});
}
}

View File

@ -18,9 +18,12 @@ use super::info::{CommonTaskInfo, InfoSet, TaskInfo, UpdateInfo};
use super::notify::{CommonProgress, EachFileStatus, Progress};
use super::reason::Reason;
use crate::task::info::State;
use crate::utils::c_wrapper::{
CFileSpec, CFormItem, CStringWrapper, DeleteCFileSpec, DeleteCFormItem, DeleteCStringPtr,
};
use crate::utils::c_wrapper::{CFileSpec, CFormItem, CStringWrapper};
cfg_oh! {
use crate::utils::c_wrapper::{DeleteCFileSpec, DeleteCFormItem, DeleteCStringPtr};
}
use crate::utils::form_item::{FileSpec, FormItem};
use crate::utils::{build_vec, get_current_timestamp, split_string, string_to_hashmap};
@ -214,9 +217,12 @@ impl TaskInfo {
common_data: c_struct.common_data,
};
unsafe { DeleteCFormItem(c_struct.form_items_ptr) };
unsafe { DeleteCFileSpec(c_struct.file_specs_ptr) };
unsafe { DeleteCEachFileStatus(c_struct.each_file_status_ptr) };
#[cfg(feature = "oh")]
{
unsafe { DeleteCFormItem(c_struct.form_items_ptr) };
unsafe { DeleteCFileSpec(c_struct.file_specs_ptr) };
unsafe { DeleteCEachFileStatus(c_struct.each_file_status_ptr) };
}
task_info
}
}
@ -374,15 +380,20 @@ impl TaskConfig {
background: c_struct.common_data.background,
},
};
unsafe { DeleteCFormItem(c_struct.form_items_ptr) };
unsafe { DeleteCFileSpec(c_struct.file_specs_ptr) };
unsafe { DeleteCStringPtr(c_struct.body_file_names_ptr) };
unsafe { DeleteCStringPtr(c_struct.certs_path_ptr) };
#[cfg(feature = "oh")]
{
unsafe { DeleteCFormItem(c_struct.form_items_ptr) };
unsafe { DeleteCFileSpec(c_struct.file_specs_ptr) };
unsafe { DeleteCStringPtr(c_struct.body_file_names_ptr) };
unsafe { DeleteCStringPtr(c_struct.certs_path_ptr) };
}
task_config
}
}
#[cfg(feature = "oh")]
#[link(name = "download_server_cxx", kind = "static")]
extern "C" {
pub(crate) fn DeleteCEachFileStatus(ptr: *const CEachFileStatus);
}

View File

@ -125,24 +125,18 @@ impl State {
State::Failed => {
from != State::Completed && from != State::Removed && from != State::Stopped
}
State::Waiting => from.is_doing(),
State::Waiting => from.is_doing() || from == State::Initialized,
State::Running | State::Retrying => {
from == State::Waiting
|| from == State::Paused
|| from == State::Stopped
|| from == State::Failed
|| from == State::Initialized
}
State::Removed => from != State::Removed,
_ => false,
}
}
pub(crate) fn check_resume(&self) -> bool {
*self == State::Waiting
|| *self == State::Paused
|| *self == State::Failed
|| *self == State::Stopped
}
}
pub(crate) struct UpdateInfo {

View File

@ -11,20 +11,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod client;
/// request task config
pub mod config;
/// request task info
pub mod info;
pub(crate) mod reason;
pub(crate) mod download;
pub(crate) mod ffi;
pub(crate) mod files;
pub(crate) mod notify;
mod operator;
pub(crate) mod reason;
pub(crate) mod request_task;
pub(crate) mod upload;
pub(crate) const ATOMIC_SERVICE: u32 = 1;
pub(crate) mod client;
pub(crate) mod ffi;
pub(crate) mod upload;

View File

@ -56,7 +56,8 @@ pub(crate) struct Progress {
pub(crate) extras: HashMap<String, String>,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
#[repr(C)]
pub(crate) struct EachFileStatus {
pub(crate) path: String,
pub(crate) reason: Reason,

View File

@ -22,12 +22,15 @@ use ylong_http_client::HttpClientError;
use ylong_runtime::io::AsyncWrite;
use ylong_runtime::time::{sleep, Sleep};
use super::config::Mode;
use crate::manage::account::is_active_user;
use crate::manage::notifier::Notifier;
cfg_oh! {
use crate::manage::account::is_active_user;
use crate::manage::notifier::Notifier;
use super::config::Mode;
use crate::task::reason::Reason;
}
use crate::info::State;
use crate::task::config::Version;
use crate::task::info::State;
use crate::task::reason::Reason;
use crate::task::request_task::RequestTask;
use crate::utils::get_current_timestamp;
@ -61,30 +64,40 @@ impl TaskOperator {
let current = get_current_timestamp();
let state = self.task.status.lock().unwrap().state;
if (state != State::Running && state != State::Retrying)
|| (self.task.conf.version == Version::API10 && !self.task.check_network_status())
{
info!("pause the task, tid: {}", self.task.task_id());
return Poll::Ready(Err(HttpClientError::user_aborted()));
}
if !self.task.check_app_state() {
info!("pause for app state, tid: {}", self.task.task_id());
if state != State::Running && state != State::Retrying {
info!("pause task {} new state {:?} ", self.task.task_id(), state);
return Poll::Ready(Err(HttpClientError::user_aborted()));
}
if self.task.conf.common_data.mode == Mode::BackGround && !is_active_user(self.task.uid()) {
info!("pause for user stopped, tid: {}", self.task.task_id());
self.task
.change_task_status(State::Waiting, Reason::AccountStopped);
if self.task.conf.version == Version::API10 && !self.task.check_network_status() {
info!("pause for network status, tid: {}", self.task.task_id());
return Poll::Ready(Err(HttpClientError::user_aborted()));
}
#[cfg(feature = "oh")]
{
if !self.task.check_app_state() {
info!("pause for app state, tid: {}", self.task.task_id());
return Poll::Ready(Err(HttpClientError::user_aborted()));
}
if self.task.conf.common_data.mode == Mode::BackGround
&& !is_active_user(self.task.uid())
{
info!("pause for user stopped, tid: {}", self.task.task_id());
self.task
.change_task_status(State::Waiting, Reason::AccountStopped);
return Poll::Ready(Err(HttpClientError::user_aborted()));
}
if current >= self.task.last_notify.load(Ordering::SeqCst) + FRONT_NOTIFY_INTERVAL {
let notify_data = self.task.build_notify_data();
self.task.last_notify.store(current, Ordering::SeqCst);
Notifier::progress(&self.task.client_manager, notify_data);
}
}
let version = self.task.conf.version;
if current >= self.task.last_notify.load(Ordering::SeqCst) + FRONT_NOTIFY_INTERVAL {
let notify_data = self.task.build_notify_data();
self.task.last_notify.store(current, Ordering::SeqCst);
Notifier::progress(&self.task.client_manager, notify_data);
}
let gauge = self.task.conf.common_data.gauge;
if version == Version::API9 || gauge {
@ -146,38 +159,6 @@ impl TaskOperator {
Poll::Ready(Ok(()))
}
pub(crate) fn poll_write_partial_file(
&self,
cx: &mut Context<'_>,
data: &[u8],
begins: u64,
ends: i64,
) -> Poll<Result<usize, HttpClientError>> {
let data_size = data.len();
let skip_size = self.task.skip_bytes.load(Ordering::SeqCst);
if skip_size + data_size as u64 <= begins {
self.task
.skip_bytes
.fetch_add(data_size as u64, Ordering::SeqCst);
return Poll::Ready(Ok(data_size));
}
let remain_skip_bytes = (begins - skip_size) as usize;
let mut data = &data[remain_skip_bytes..];
self.task.skip_bytes.store(begins, Ordering::SeqCst);
if ends >= 0 {
let total_bytes = ends as u64 - begins + 1;
let written_bytes = self.task.progress.lock().unwrap().processed[0] as u64;
if written_bytes == total_bytes {
return Poll::Ready(Err(HttpClientError::user_aborted()));
}
if data.len() as u64 + written_bytes >= total_bytes {
let remain_bytes = (total_bytes - written_bytes) as usize;
data = &data[..remain_bytes];
}
}
self.poll_write_file(cx, data, remain_skip_bytes)
}
pub(crate) fn poll_write_file(
&self,
cx: &mut Context<'_>,

View File

@ -11,40 +11,42 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::SeekFrom;
use std::io::{self, SeekFrom};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Mutex, MutexGuard};
use std::thread::sleep;
use std::time::Duration;
use ylong_http_client::async_impl::{Body, Client, Request, RequestBuilder, Response};
use ylong_http_client::{ErrorKind, HttpClientError};
use ylong_runtime::io::{AsyncSeekExt, AsyncWriteExt};
cfg_oh! {
use crate::manage::app_state::AppState;
use crate::manage::database::Database;
use crate::manage::notifier::Notifier;
use crate::manage::SystemConfig;
use crate::service::client::ClientManagerEntry;
use super::info::UpdateInfo;
use crate::utils::publish_state_change_event;
use crate::utils::{request_background_notify, RequestTaskMsg};
}
use super::config::{Mode, Version};
use super::info::{CommonTaskInfo, State, TaskInfo, UpdateInfo};
use super::info::{CommonTaskInfo, State, TaskInfo};
use super::notify::{EachFileStatus, NotifyData, Progress};
use super::reason::Reason;
use crate::error::ErrorCode;
use crate::manage::app_state::AppState;
use crate::manage::database::Database;
use crate::manage::notifier::Notifier;
use crate::manage::{self, SystemConfig};
use crate::service::client::ClientManagerEntry;
use crate::task::client::build_client;
use crate::task::config::{Action, TaskConfig};
use crate::task::files::{AttachedFiles, Files};
use crate::utils::form_item::FileSpec;
use crate::utils::{
get_current_timestamp, publish_state_change_event, request_background_notify, RequestTaskMsg,
};
use crate::utils::get_current_timestamp;
const RETRY_INTERVAL: u64 = 200;
#[allow(unused)]
pub(crate) struct RequestTask {
pub(crate) conf: TaskConfig,
pub(crate) app_state: AppState,
pub(crate) client: Client,
pub(crate) client_manager: ClientManagerEntry,
pub(crate) files: Files,
pub(crate) body_files: Files,
pub(crate) ctime: u64,
@ -67,7 +69,11 @@ pub(crate) struct RequestTask {
pub(crate) upload_counts: AtomicUsize,
pub(crate) rate_limiting: AtomicU64,
pub(crate) last_notify: AtomicU64,
pub(crate) network: manage::Network,
pub(crate) network: crate::manage::Network,
#[cfg(feature = "oh")]
pub(crate) app_state: AppState,
#[cfg(feature = "oh")]
pub(crate) client_manager: ClientManagerEntry,
}
impl RequestTask {
@ -107,8 +113,8 @@ impl RequestTask {
}
}
pub(crate) fn satisfied(&self) -> bool {
if !self.network_online() || !self.check_network_status() {
pub(crate) async fn satisfied(&self) -> bool {
if !self.network_online().await || !self.check_network_status() {
error!("check network failed, tid: {}", self.task_id());
false
} else {
@ -120,11 +126,11 @@ impl RequestTask {
impl RequestTask {
pub(crate) fn new(
config: TaskConfig,
app_state: AppState,
#[cfg(feature = "oh")] app_state: AppState,
files: AttachedFiles,
client: Client,
client_manager: ClientManagerEntry,
network: manage::Network,
#[cfg(feature = "oh")] client_manager: ClientManagerEntry,
network: crate::manage::Network,
) -> RequestTask {
let file_len = files.files.len();
let action = config.common_data.action;
@ -170,22 +176,27 @@ impl RequestTask {
skip_bytes: AtomicU64::new(0),
upload_counts: AtomicUsize::new(0),
rate_limiting: AtomicU64::new(0),
app_state,
last_notify: AtomicU64::new(time),
client_manager,
network,
#[cfg(feature = "oh")]
client_manager,
#[cfg(feature = "oh")]
app_state,
}
}
pub(crate) fn new_by_info(
config: TaskConfig,
system: SystemConfig,
app_state: AppState,
#[cfg(feature = "oh")] system: SystemConfig,
#[cfg(feature = "oh")] app_state: AppState,
info: TaskInfo,
client_manager: ClientManagerEntry,
network: manage::Network,
#[cfg(feature = "oh")] client_manager: ClientManagerEntry,
network: crate::manage::Network,
) -> Result<RequestTask, ErrorCode> {
#[cfg(feature = "oh")]
let (files, client) = check_config(&config, system)?;
#[cfg(not(feature = "oh"))]
let (files, client) = check_config(&config)?;
let file_len = files.files.len();
let action = config.common_data.action;
@ -242,10 +253,12 @@ impl RequestTask {
skip_bytes: AtomicU64::new(0),
upload_counts: AtomicUsize::new(upload_counts),
rate_limiting: AtomicU64::new(0),
app_state,
last_notify: AtomicU64::new(time),
client_manager,
network,
#[cfg(feature = "oh")]
client_manager,
#[cfg(feature = "oh")]
app_state,
})
}
@ -270,12 +283,14 @@ impl RequestTask {
&& self.conf.common_data.retry)
{
self.change_task_status(State::Failed, Reason::UnsupportedNetworkType);
} else {
self.change_task_status(State::Waiting, Reason::UnsupportedNetworkType);
}
return false;
}
true
}
#[cfg(feature = "oh")]
pub(crate) fn check_app_state(&self) -> bool {
if self.conf.common_data.mode == Mode::FrontEnd && !self.app_state.is_foreground() {
if self.conf.common_data.action == Action::Upload {
@ -289,7 +304,7 @@ impl RequestTask {
}
}
pub(crate) fn network_online(&self) -> bool {
pub(crate) async fn network_online(&self) -> bool {
if !self.network.is_online() {
if self.conf.version == Version::API10
&& self.conf.common_data.mode == Mode::BackGround
@ -302,7 +317,7 @@ impl RequestTask {
if self.network.is_online() {
return true;
}
sleep(Duration::from_millis(RETRY_INTERVAL));
ylong_runtime::time::sleep(Duration::from_millis(RETRY_INTERVAL));
}
self.change_task_status(State::Failed, Reason::NetworkOffline);
}
@ -311,6 +326,23 @@ impl RequestTask {
true
}
pub(crate) fn prepare_running(&self) {
let old_state = self.status.lock().unwrap().state;
info!(
"task {} prepare running, action: {:?}, state :{:?}",
self.task_id(),
self.action(),
old_state,
);
#[cfg(feature = "oh")]
if old_state == State::Paused {
let notify_data = self.build_notify_data();
Notifier::resume(&self.client_manager, notify_data);
}
self.change_task_status(State::Running, Reason::Default);
}
pub(crate) fn build_request_builder(&self) -> Result<RequestBuilder, HttpClientError> {
use ylong_http_client::async_impl::PercentEncoder;
@ -346,111 +378,99 @@ impl RequestTask {
Ok(request)
}
async fn clear_downloaded_file(&self) -> bool {
pub(crate) async fn clear_downloaded_file(&self) -> Result<(), std::io::Error> {
info!("task {} clear downloaded file", self.task_id());
let file = self.files.get_mut(0).unwrap();
let res = file.set_len(0).await;
match res {
Err(e) => {
error!("clear download file error: {:?}", e);
self.change_task_status(State::Failed, Reason::IoError);
false
}
_ => {
debug!("set len success");
match file.seek(SeekFrom::Start(0)).await {
Err(e) => {
error!("seek err is {:?}", e);
self.change_task_status(State::Failed, Reason::IoError);
false
}
Ok(_) => {
debug!("seek success");
let mut progress_guard = self.progress.lock().unwrap();
progress_guard.common_data.total_processed = 0;
progress_guard.processed[0] = 0;
true
}
}
}
}
file.set_len(0).await?;
file.seek(SeekFrom::Start(0)).await?;
let mut progress_guard = self.progress.lock().unwrap();
progress_guard.common_data.total_processed = 0;
progress_guard.processed[0] = 0;
Ok(())
}
pub(crate) async fn build_download_request(&self) -> Option<Request> {
let mut request_builder = match self.build_request_builder() {
Ok(builder) => builder,
_ => {
self.change_task_status(State::Failed, Reason::BuildRequestFailed);
return None;
pub(crate) async fn build_download_request(&self) -> Result<Request, TaskError> {
let mut request_builder = self.build_request_builder()?;
let file = self.files.get_mut(0).unwrap();
let has_downloaded = file.metadata().await?.len();
let resume_download = has_downloaded > 0;
let require_range = self.require_range();
let begins = self.conf.common_data.begins;
let ends = self.conf.common_data.ends;
info!(
"task {} build download request, resume_download: {}, require_range: {}",
self.task_id(),
resume_download,
require_range
);
match (resume_download, require_range) {
(true, false) => {
let (builder, support_range) = self.support_range(request_builder);
request_builder = builder;
if support_range {
request_builder =
self.range_request(request_builder, begins + has_downloaded, ends);
} else {
self.clear_downloaded_file().await?;
}
}
(false, true) => {
request_builder = self.range_request(request_builder, begins, ends);
}
(true, true) => {
let (builder, support_range) = self.support_range(request_builder);
request_builder = builder;
if support_range {
request_builder =
self.range_request(request_builder, begins + has_downloaded, ends);
} else {
return Err(TaskError::Failed(Reason::UnsupportedRangeRequest));
}
}
(false, false) => {}
};
let mut begins = self.conf.common_data.begins;
let ends = self.conf.common_data.ends;
self.range_response.store(false, Ordering::SeqCst);
if self.resume.load(Ordering::SeqCst) || begins > 0 || ends >= 0 {
self.range_request.store(true, Ordering::SeqCst);
self.skip_bytes.store(0, Ordering::SeqCst);
if self.resume.load(Ordering::SeqCst) {
let if_range = {
let progress_guard = self.progress.lock().unwrap();
let etag = progress_guard.extras.get("etag");
let last_modified = progress_guard.extras.get("last-modified");
if let Some(etag) = etag {
request_builder = request_builder.header("If-Range", etag.as_str());
true
} else if let Some(last_modified) = last_modified {
request_builder =
request_builder.header("If-Range", last_modified.as_str());
true
} else {
false
}
};
if !if_range {
// unable to verify file consistency, need download again
if begins == 0 && ends < 0 {
self.range_request.store(false, Ordering::SeqCst);
}
if !self.clear_downloaded_file().await {
return None;
}
}
}
let file = self.files.get_mut(0).unwrap();
let current_len = file.metadata().await.unwrap().len();
begins += current_len;
// Modifys the progress to the current file size.
// It will be recorded to the database later during download.
let mut progress_guard = self.progress.lock().unwrap();
progress_guard.processed[0] = current_len as usize;
progress_guard.common_data.total_processed = current_len as usize;
if self.range_request.load(Ordering::SeqCst) {
let range = if ends < 0 {
format!("bytes={begins}-")
} else {
format!("bytes={begins}-{ends}")
};
request_builder = request_builder.header("Range", range.as_str());
}
} else {
self.range_request.store(false, Ordering::SeqCst);
}
let result = request_builder.body(Body::slice(self.conf.data.clone()));
match result {
Ok(value) => Some(value),
Err(e) => {
error!("build download request error is {:?}", e);
self.change_task_status(State::Failed, Reason::BuildRequestFailed);
None
}
}
let request = request_builder.body(Body::slice(self.conf.data.clone()))?;
Ok(request)
}
pub(crate) fn get_file_info(&self, response: &Response) -> bool {
if self.get_file_info.load(Ordering::SeqCst) {
return true;
fn range_request(
&self,
request_builder: RequestBuilder,
begins: u64,
ends: i64,
) -> RequestBuilder {
let range = if ends < 0 {
format!("bytes={begins}-")
} else {
format!("bytes={begins}-{ends}")
};
request_builder.header("Range", range.as_str())
}
fn support_range(&self, mut request_builder: RequestBuilder) -> (RequestBuilder, bool) {
let progress_guard = self.progress.lock().unwrap();
let mut support_range = false;
if let Some(etag) = progress_guard.extras.get("etag") {
request_builder = request_builder.header("If-Range", etag.as_str());
support_range = true;
} else if let Some(last_modified) = progress_guard.extras.get("last-modified") {
request_builder = request_builder.header("If-Range", last_modified.as_str());
support_range = true;
}
self.get_file_info.store(true, Ordering::SeqCst);
if !support_range {
info!("task {} does not support range request", self.task_id());
}
(request_builder, support_range)
}
pub(crate) fn get_file_info(&self, response: &Response) -> Result<(), TaskError> {
let content_type = response.headers().get("content-type");
if let Some(mime_type) = content_type {
if let Ok(value) = mime_type.to_string() {
@ -459,167 +479,72 @@ impl RequestTask {
}
let content_length = response.headers().get("content-length");
if let Some(len) = content_length {
let length = len.to_string();
match length {
Ok(value) => {
let len = value.parse::<i64>();
match len {
Ok(v) => {
let mut guard = self.progress.lock().unwrap();
if !self.restored.load(Ordering::SeqCst) {
guard.sizes[0] = v + guard.processed[0] as i64;
}
self.file_total_size.store(v, Ordering::SeqCst);
debug!("the download task content-length is {}", v);
}
Err(e) => {
error!("convert string to i64 error: {:?}", e);
}
}
if let Some(Ok(len)) = content_length.map(|v| v.to_string()) {
match len.parse::<i64>() {
Ok(v) => {
let mut progress = self.progress.lock().unwrap();
progress.sizes = vec![v + progress.processed[0] as i64];
self.file_total_size.store(v, Ordering::SeqCst);
debug!("the download task content-length is {}", v);
}
Err(e) => {
error!("convert header value to string error: {:?}", e);
error!("convert string to i64 error: {:?}", e);
}
}
} else {
error!("cannot get content-length of the task");
if self.conf.common_data.precise {
self.change_task_status(State::Failed, Reason::GetFileSizeFailed);
return false;
return Err(TaskError::Failed(Reason::GetFileSizeFailed));
}
}
true
Ok(())
}
async fn handle_body_transfer_error(&self) {
pub(crate) async fn handle_body_transfer_error(&self) -> Result<(), TaskError> {
#[cfg(feature = "oh")]
if self.network.check_interval_online().await {
self.change_task_status(State::Failed, Reason::OthersError);
return Err(TaskError::Failed(Reason::OthersError));
} else {
match self.conf.version {
Version::API9 => {
if self.conf.common_data.action == Action::Upload {
self.change_task_status(State::Failed, Reason::NetworkOffline);
return Err(TaskError::Failed(Reason::NetworkOffline));
}
}
Version::API10 => {
if self.conf.common_data.mode == Mode::FrontEnd || !self.conf.common_data.retry
{
self.change_task_status(State::Failed, Reason::NetworkOffline);
return Err(TaskError::Failed(Reason::NetworkOffline));
}
}
}
}
Err(TaskError::Waiting(TaskPhase::NetworkOffline))
}
pub(crate) async fn handle_download_error(&self, result: &Result<(), HttpClientError>) -> bool {
match result {
Ok(_) => true,
Err(err) => {
error!("download err is {:?}", err);
match err.error_kind() {
ErrorKind::Timeout => {
self.change_task_status(State::Failed, Reason::ContinuousTaskTimeout);
}
// user triggered
ErrorKind::UserAborted => return true,
ErrorKind::BodyTransfer | ErrorKind::BodyDecode => {
self.handle_body_transfer_error().await;
}
_ => {
if format!("{}", err).contains("No space left on device") {
self.change_task_status(State::Failed, Reason::InsufficientSpace);
} else {
self.change_task_status(State::Failed, Reason::OthersError);
}
}
}
false
}
}
}
pub(crate) async fn handle_response_error(
pub(crate) async fn handle_download_error(
&self,
response: &Result<Response, HttpClientError>,
) -> bool {
let index = self.progress.lock().unwrap().common_data.index;
match response {
Ok(r) => {
let http_response_code = r.status();
info!(
"task {} get http response code {}",
self.conf.common_data.task_id, http_response_code
);
if http_response_code.is_server_error()
|| (http_response_code.as_u16() != 408 && http_response_code.is_client_error())
|| http_response_code.is_redirection()
{
self.set_code(index, Reason::ProtocolError, false);
return false;
}
if http_response_code.as_u16() == 408 {
if !self.retry_for_request.load(Ordering::SeqCst) {
self.retry_for_request.store(true, Ordering::SeqCst);
} else {
self.set_code(index, Reason::ProtocolError, false);
}
return false;
}
if self.range_request.load(Ordering::SeqCst) {
match http_response_code.as_u16() {
206 => {
self.range_response.store(true, Ordering::SeqCst);
}
200 => {
self.range_response.store(false, Ordering::SeqCst);
if self.resume.load(Ordering::SeqCst) {
if !self.clear_downloaded_file().await {
return false;
}
} else {
self.set_code(index, Reason::UnsupportedRangeRequest, false);
return false;
}
}
_ => {}
}
}
true
err: HttpClientError,
) -> Result<(), TaskError> {
error!("download err is {:?}", err);
match err.error_kind() {
ErrorKind::Timeout => Err(TaskError::Failed(Reason::ContinuousTaskTimeout)),
// user triggered
ErrorKind::UserAborted => Err(TaskError::Waiting(TaskPhase::UserAbort)),
ErrorKind::BodyTransfer | ErrorKind::BodyDecode => {
self.handle_body_transfer_error().await
}
Err(e) => {
error!("http client err is {:?}", e);
match e.error_kind() {
ErrorKind::UserAborted => self.set_code(index, Reason::UserOperation, false),
ErrorKind::Timeout => {
self.set_code(index, Reason::ContinuousTaskTimeout, false)
}
ErrorKind::Request => self.set_code(index, Reason::RequestError, false),
ErrorKind::Redirect => self.set_code(index, Reason::RedirectError, false),
ErrorKind::Connect | ErrorKind::ConnectionUpgrade => {
if e.is_dns_error() {
self.set_code(index, Reason::Dns, false);
} else if e.is_tls_error() {
self.set_code(index, Reason::Ssl, false);
} else {
self.set_code(index, Reason::Tcp, false);
}
}
ErrorKind::BodyTransfer => self.handle_body_transfer_error().await,
_ => {
if format!("{}", e).contains("No space left on device") {
self.set_code(index, Reason::InsufficientSpace, false);
} else {
self.set_code(index, Reason::OthersError, false);
}
}
_ => {
if format!("{}", err).contains("No space left on device") {
Err(TaskError::Failed(Reason::InsufficientSpace))
} else {
Err(TaskError::Failed(Reason::OthersError))
}
false
}
}
}
#[cfg(feature = "oh")]
pub(crate) fn notify_response(&self, response: &Response) {
let tid = self.conf.common_data.task_id;
let version: String = response.version().as_str().into();
@ -637,17 +562,8 @@ impl RequestTask {
.send_response(tid, version, status_code, status_message, headers)
}
pub(crate) fn record_response_header(&self, response: &Result<Response, HttpClientError>) {
if let Ok(r) = response {
self.notify_response(r);
let mut guard = self.progress.lock().unwrap();
guard.extras.clear();
for (k, v) in r.headers() {
if let Ok(value) = v.to_string() {
guard.extras.insert(k.to_string().to_lowercase(), value);
}
}
}
pub(crate) fn require_range(&self) -> bool {
self.conf.common_data.begins > 0 || self.conf.common_data.ends >= 0
}
pub(crate) async fn record_upload_response(
@ -655,7 +571,6 @@ impl RequestTask {
index: usize,
response: Result<Response, HttpClientError>,
) {
self.record_response_header(&response);
if let Ok(mut r) = response {
let file = match self.body_files.get_mut(index) {
Some(file) => file,
@ -681,10 +596,6 @@ impl RequestTask {
}
pub(crate) fn set_code(&self, index: usize, code: Reason, is_force: bool) {
// why?
if code == Reason::UploadFileError {
return;
}
// `unwrap` for propagating panics among threads.
let mut codes_guard = self.code.lock().unwrap();
match codes_guard.get_mut(index) {
@ -694,7 +605,7 @@ impl RequestTask {
}
}
None => {
info!(
error!(
"set code index error; tid: {}, index: {}, code: {:?}",
self.conf.common_data.task_id, index, code
);
@ -736,22 +647,27 @@ impl RequestTask {
} else {
self.set_code(index, to_reason, false);
}
let codes_guard = self.code.lock().unwrap();
let update_info = UpdateInfo {
mtime: task_status.mtime,
reason: task_status.reason.repr,
progress: progress.clone(),
each_file_status: RequestTask::get_each_file_status_by_code(
&codes_guard,
&self.conf.file_specs,
),
tries: self.tries.load(Ordering::SeqCst),
mime_type: self.mime_type.lock().unwrap().clone(),
};
Database::get_instance().update_task(self.task_id(), update_info);
#[cfg(feature = "oh")]
{
let codes_guard = self.code.lock().unwrap();
let update_info = UpdateInfo {
mtime: task_status.mtime,
reason: task_status.reason.repr,
progress: progress.clone(),
each_file_status: RequestTask::get_each_file_status_by_code(
&codes_guard,
&self.conf.file_specs,
),
tries: self.tries.load(Ordering::SeqCst),
mime_type: self.mime_type.lock().unwrap().clone(),
};
Database::get_instance().update_task(self.task_id(), update_info);
}
ErrorCode::ErrOk
}
#[cfg(feature = "oh")]
pub(crate) fn state_change_notify(&self) {
let state = self.status.lock().unwrap().state;
let total_processed = self.progress.lock().unwrap().common_data.total_processed;
@ -794,6 +710,7 @@ impl RequestTask {
self.background_notify();
}
#[cfg(feature = "oh")]
pub(crate) fn state_change_notify_of_no_run(
client_manager: &ClientManagerEntry,
notify_data: NotifyData,
@ -941,21 +858,25 @@ impl RequestTask {
if index >= self.conf.file_specs.len() {
return;
}
let percent = total_processed * 100 / (file_total_size as u64);
debug!("background notify");
let task_msg = RequestTaskMsg {
task_id: self.conf.common_data.task_id,
uid: self.conf.common_data.uid as i32,
action: self.conf.common_data.action.repr,
};
let path = self.conf.file_specs[index].path.as_str();
let file_name = self.conf.file_specs[index].file_name.as_str();
let _ = request_background_notify(task_msg, path, file_name, percent as u32);
#[cfg(feature = "oh")]
{
let percent = total_processed * 100 / (file_total_size as u64);
let task_msg = RequestTaskMsg {
task_id: self.conf.common_data.task_id,
uid: self.conf.common_data.uid as i32,
action: self.conf.common_data.action.repr,
};
let path = self.conf.file_specs[index].path.as_str();
let file_name = self.conf.file_specs[index].file_name.as_str();
let _ = request_background_notify(task_msg, path, file_name, percent as u32);
}
}
pub(crate) fn get_upload_info(&self, index: usize) -> (bool, u64) {
let guard = self.progress.lock().unwrap();
let file_size = guard.sizes[index];
let mut is_partial_upload = false;
let mut upload_file_length: u64 = file_size as u64 - guard.processed[index] as u64;
@ -978,6 +899,7 @@ impl RequestTask {
(is_partial_upload, upload_file_length)
}
#[cfg(feature = "oh")]
pub(crate) fn notify_header_receive(&self) {
if self.conf.version == Version::API9 && self.conf.common_data.action == Action::Upload {
let notify_data = self.build_notify_data();
@ -1025,12 +947,41 @@ fn check_file_specs(file_specs: &[FileSpec]) -> bool {
pub(crate) fn check_config(
config: &TaskConfig,
system: SystemConfig,
#[cfg(feature = "oh")] system: SystemConfig,
) -> Result<(AttachedFiles, Client), ErrorCode> {
if !check_file_specs(&config.file_specs) {
return Err(ErrorCode::Other);
}
let files = AttachedFiles::open(config).map_err(|_| ErrorCode::FileOperationErr)?;
#[cfg(feature = "oh")]
let client = build_client(config, system).map_err(|_| ErrorCode::Other)?;
#[cfg(not(feature = "oh"))]
let client = build_client(config).map_err(|_| ErrorCode::Other)?;
Ok((files, client))
}
impl From<HttpClientError> for TaskError {
fn from(_value: HttpClientError) -> Self {
TaskError::Failed(Reason::BuildRequestFailed)
}
}
impl From<io::Error> for TaskError {
fn from(_value: io::Error) -> Self {
TaskError::Failed(Reason::IoError)
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum TaskPhase {
NeedRetry,
UserAbort,
NetworkOffline,
}
#[derive(Debug, PartialEq, Eq)]
pub enum TaskError {
Failed(Reason),
Waiting(TaskPhase),
}

View File

@ -18,11 +18,12 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use ylong_http_client::async_impl::{Body, MultiPart, Part, Request, UploadOperator, Uploader};
use ylong_http_client::HttpClientError;
use ylong_runtime::io::{AsyncRead, AsyncSeek, ReadBuf};
use ylong_http_client::{ErrorKind, HttpClientError};
use ylong_runtime::io::{AsyncRead, AsyncSeek, AsyncSeekExt, ReadBuf};
use super::operator::TaskOperator;
use super::reason::Reason;
use super::request_task::{TaskError, TaskPhase};
use crate::task::info::State;
use crate::task::request_task::RequestTask;
#[cfg(feature = "oh")]
@ -30,11 +31,12 @@ use crate::trace::Trace;
struct TaskReader {
pub(crate) task: Arc<RequestTask>,
pub(crate) index: usize,
}
impl TaskReader {
pub(crate) fn new(task: Arc<RequestTask>) -> Self {
Self { task }
pub(crate) fn new(task: Arc<RequestTask>, index: usize) -> Self {
Self { task, index }
}
}
@ -44,7 +46,7 @@ impl AsyncRead for TaskReader {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let index = self.task.progress.lock().unwrap().common_data.index;
let index = self.index;
let file = self.task.files.get_mut(index).unwrap();
let (is_partial_upload, total_upload_bytes) = self.task.get_upload_info(index);
let mut progress_guard = self.task.progress.lock().unwrap();
@ -108,7 +110,7 @@ impl UploadOperator for TaskOperator {
fn build_stream_request(task: Arc<RequestTask>, index: usize) -> Option<Request> {
debug!("build stream request");
let task_reader = TaskReader::new(task.clone());
let task_reader = TaskReader::new(task.clone(), index);
let task_operator = TaskOperator::new(task.clone());
match task.build_request_builder() {
@ -135,7 +137,7 @@ fn build_stream_request(task: Arc<RequestTask>, index: usize) -> Option<Request>
fn build_multipart_request(task: Arc<RequestTask>, index: usize) -> Option<Request> {
debug!("build multipart request");
let task_reader = TaskReader::new(task.clone());
let task_reader = TaskReader::new(task.clone(), index);
let task_operator = TaskOperator::new(task.clone());
let mut multi_part = MultiPart::new();
for item in task.conf.form_items.iter() {
@ -190,8 +192,46 @@ fn build_request_common(
}
}
impl RequestTask {
fn prepare_single_upload(&self, index: usize) -> bool {
if let Some(file) = self.files.get_mut(index) {
let mut progress = self.progress.lock().unwrap();
progress.common_data.index = index;
progress.common_data.total_processed = 0;
progress.processed[index] = 0;
file.seek(SeekFrom::Start(0));
true
} else {
error!("task {} file {} not found", self.task_id(), index);
false
}
}
}
pub(crate) async fn upload(task: Arc<RequestTask>) {
debug!("begin upload task, tid: {}", task.conf.common_data.task_id);
loop {
if let Err(e) = upload_inner(task.clone()).await {
match e {
TaskError::Failed(reason) => {
task.change_task_status(State::Failed, reason);
}
TaskError::Waiting(phase) => match phase {
TaskPhase::NeedRetry => {
continue;
}
TaskPhase::UserAbort => {}
TaskPhase::NetworkOffline => {}
},
}
}
break;
}
}
async fn upload_inner(task: Arc<RequestTask>) -> Result<(), TaskError> {
task.prepare_running();
info!("upload task {} start running", task.task_id());
#[cfg(feature = "oh")]
{
@ -202,81 +242,44 @@ pub(crate) async fn upload(task: Arc<RequestTask>) {
}
let size = task.conf.file_specs.len();
loop {
let index = task.progress.lock().unwrap().common_data.index;
let start = task.progress.lock().unwrap().common_data.index;
for index in start..size {
if !task.prepare_single_upload(index) {
return Err(TaskError::Failed(Reason::OthersError));
}
let is_multipart = match task.conf.headers.get("Content-Type") {
Some(s) => s.eq("multipart/form-data"),
None => task.conf.method.to_uppercase().eq("POST"),
};
let result = if is_multipart {
upload_one_file(task.clone(), index, build_multipart_request).await
if is_multipart {
upload_one_file(task.clone(), index, build_multipart_request).await?
} else {
upload_one_file(task.clone(), index, build_stream_request).await
upload_one_file(task.clone(), index, build_stream_request).await?
};
if result {
info!(
"upload one file success, tid: {}, index is {}, size is {}",
task.conf.common_data.task_id, index, size
);
task.upload_counts.fetch_add(1, Ordering::SeqCst);
}
{
let task_status = task.status.lock().unwrap();
if !task_status.state.is_doing() {
return;
}
}
task.notify_header_receive();
let mut progress = task.progress.lock().unwrap();
if index + 1 == size {
break;
}
progress.common_data.index += 1;
progress.common_data.index = 0;
#[cfg(feature = "oh")]
task.notify_header_receive();
}
let uploaded = task.upload_counts.load(Ordering::SeqCst);
if uploaded == size {
task.change_task_status(State::Completed, Reason::Default);
} else {
task.change_task_status(State::Failed, Reason::UploadFileError);
use hisysevent::{build_number_param, build_str_param};
use crate::sys_event::SysEvent;
// Records sys event.
SysEvent::task_fault()
.param(build_str_param!(crate::sys_event::TASKS_TYPE, "UPLOAD"))
.param(build_number_param!(crate::sys_event::TOTAL_FILE_NUM, size))
.param(build_number_param!(
crate::sys_event::FAIL_FILE_NUM,
size - uploaded
))
.param(build_number_param!(
crate::sys_event::SUCCESS_FILE_NUM,
uploaded
))
.param(build_number_param!(
crate::sys_event::ERROR_INFO,
Reason::UploadFileError.repr as i32
))
.write();
}
debug!("upload end, tid: {}", task.conf.common_data.task_id);
task.change_task_status(State::Completed, Reason::Default);
info!("task {} upload success", task.task_id());
Ok(())
}
async fn upload_one_file<F>(task: Arc<RequestTask>, index: usize, build_upload_request: F) -> bool
async fn upload_one_file<F>(
task: Arc<RequestTask>,
index: usize,
build_upload_request: F,
) -> Result<(), TaskError>
where
F: Fn(Arc<RequestTask>, usize) -> Option<Request>,
{
info!(
"begin upload one file, tid: {}, index is {}",
task.conf.common_data.task_id, index
task.conf.common_data.task_id, index,
);
// Ensures `_trace` can only be freed when this function exits.
@ -289,43 +292,148 @@ where
));
}
loop {
task.set_code(index, Reason::Default, true);
let request: Option<Request> = build_upload_request(task.clone(), index);
if request.is_none() {
return false;
}
let response = task.client.request(request.unwrap()).await;
if task.handle_response_error(&response).await {
// `unwrap` for propagating panics among threads.
if let Some(code) = task.code.lock().unwrap().get_mut(index) {
*code = Reason::Default;
}
task.record_upload_response(index, response).await;
return true;
}
task.record_upload_response(index, response).await;
// `unwrap` for propagating panics among threads.
let Some(request) = build_upload_request(task.clone(), index) else {
return Err(TaskError::Failed(Reason::BuildRequestFailed));
};
let state = task.status.lock().unwrap().state;
if state != State::Running && state != State::Retrying {
return false;
}
let code = *task
.code
.lock()
.unwrap()
.get(index)
.unwrap_or(&Reason::OthersError);
if code != Reason::Default {
error!(
"upload {} file fail, which reason is {}",
index, code.repr as u32
let response = task.client.request(request).await;
match response.as_ref() {
Ok(response) => {
let status_code = response.status();
info!(
"task {} get http response code {}",
task.conf.common_data.task_id, status_code,
);
if code != Reason::UserOperation {
task.change_task_status(State::Failed, code);
if status_code.is_server_error()
|| (status_code.as_u16() != 408 && status_code.is_client_error())
|| status_code.is_redirection()
{
return Err(TaskError::Failed(Reason::ProtocolError));
}
if status_code.as_u16() == 408 {
if task.tries.load(Ordering::SeqCst) < 2 {
task.tries.fetch_add(1, Ordering::SeqCst);
info!("task {} server timeout", task.task_id());
return Err(TaskError::Waiting(TaskPhase::NeedRetry));
} else {
info!("task {} retry 3 times", task.task_id());
return Err(TaskError::Failed(Reason::ProtocolError));
}
}
return false;
}
Err(e) => {
error!("Task {} {:?}", task.task_id(), e);
match e.error_kind() {
ErrorKind::Timeout => return Err(TaskError::Failed(Reason::ContinuousTaskTimeout)),
ErrorKind::Request => return Err(TaskError::Failed(Reason::RequestError)),
ErrorKind::Redirect => return Err(TaskError::Failed(Reason::RedirectError)),
ErrorKind::Connect | ErrorKind::ConnectionUpgrade => {
if task.tries.load(Ordering::SeqCst) < 2 {
task.tries.fetch_add(1, Ordering::SeqCst);
return Err(TaskError::Waiting(TaskPhase::NeedRetry));
}
info!("task {} retry 3 times", task.task_id());
if e.is_dns_error() {
return Err(TaskError::Failed(Reason::Dns));
} else if e.is_tls_error() {
return Err(TaskError::Failed(Reason::Ssl));
} else {
return Err(TaskError::Failed(Reason::Tcp));
}
}
ErrorKind::BodyTransfer => return task.handle_body_transfer_error().await,
ErrorKind::UserAborted => return Err(TaskError::Waiting(TaskPhase::UserAbort)),
_ => {
if format!("{}", e).contains("No space left on device") {
return Err(TaskError::Failed(Reason::InsufficientSpace));
} else {
return Err(TaskError::Failed(Reason::OthersError));
}
}
};
}
};
task.record_upload_response(index, response).await;
Ok(())
}
#[cfg(not(feature = "oh"))]
#[cfg(test)]
mod test {
use std::fs::File;
use std::io::{BufRead, BufReader, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use crate::config::{Action, ConfigBuilder, Mode, TaskConfig};
use crate::info::State;
use crate::manage::Network;
use crate::task::request_task::{check_config, RequestTask, TaskError, TaskPhase};
use crate::task::upload::upload_inner;
fn build_task(config: TaskConfig) -> Arc<RequestTask> {
let (files, client) = check_config(&config).unwrap();
let network = Network::new();
let task = Arc::new(RequestTask::new(config, files, client, network));
task.status.lock().unwrap().state = State::Initialized;
task
}
fn init() {
let _ = env_logger::builder().is_test(true).try_init();
let _ = std::fs::create_dir("test_files/");
static ONCE: std::sync::Once = std::sync::Once::new();
ONCE.call_once(|| {
info!("server start 127.0.0.1:7878");
std::thread::spawn(|| {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
std::thread::sleep(std::time::Duration::from_secs(2));
let stream = stream.unwrap();
handle_connection(stream);
}
});
})
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let http_request: Vec<_> = buf_reader
.lines()
.map(|result| result.unwrap())
.take_while(|line| !line.is_empty())
.collect();
debug!("http request: {:#?}", http_request);
let response = "HTTP/1.1 200 OK\r\n\r\n";
stream.write_all(response.as_bytes()).unwrap();
}
#[test]
fn ut_upload_basic() {
init();
let file_path = "test_files/ut_upload_basic.txt";
let file = File::options()
.read(true)
.write(true)
.create(true)
.open(file_path)
.unwrap();
file.set_len(100000).unwrap();
let config = ConfigBuilder::new()
.action(Action::Upload)
.method("POST")
.mode(Mode::BackGround)
.file_spec(file)
.url("http://127.0.0.1:7878/")
.redirect(true)
.build();
let task = build_task(config);
ylong_runtime::block_on(async {
upload_inner(task).await.unwrap();
})
}
}

View File

@ -40,12 +40,18 @@ impl From<&String> for CStringWrapper {
impl ToString for CStringWrapper {
fn to_string(&self) -> String {
if self.c_str.is_null() || self.len == 0 {
unsafe { DeleteChar(self.c_str) };
#[cfg(feature = "oh")]
unsafe {
DeleteChar(self.c_str)
};
return String::new();
}
let bytes = unsafe { slice::from_raw_parts(self.c_str as *const u8, self.len as usize) };
let str = unsafe { String::from_utf8_unchecked(bytes.to_vec()) };
unsafe { DeleteChar(self.c_str) };
#[cfg(feature = "oh")]
unsafe {
DeleteChar(self.c_str)
};
str
}
}
@ -104,7 +110,7 @@ impl FormItem {
}
}
#[link(name = "download_server_cxx", kind = "static")]
#[cfg(feature = "oh")]
extern "C" {
pub(crate) fn DeleteChar(ptr: *const c_char);
pub(crate) fn DeleteCFormItem(ptr: *const CFormItem);

View File

@ -12,7 +12,7 @@
// limitations under the License.
use std::fs::File;
use std::os::fd::{AsRawFd, RawFd};
use std::os::fd::{IntoRawFd, RawFd};
/// File Spec
#[derive(Clone, Debug)]
@ -33,14 +33,14 @@ pub struct FileSpec {
impl FileSpec {
/// Create a new file spec with user file.
pub fn user_file(f: &File) -> Self {
pub fn user_file(file: File) -> Self {
Self {
name: "".to_string(),
path: "".to_string(),
file_name: "".to_string(),
mime_type: "".to_string(),
is_user_file: true,
fd: Some(f.as_raw_fd()),
fd: Some(file.into_raw_fd()),
}
}
}

View File

@ -13,14 +13,17 @@
pub(crate) mod c_wrapper;
pub(crate) mod form_item;
pub(crate) mod task_id_generator;
pub(crate) mod url_policy;
use std::collections::HashMap;
use std::future::Future;
use std::io::Write;
use std::time::{SystemTime, UNIX_EPOCH};
pub(crate) use ffi::RequestTaskMsg;
cfg_oh! {
pub(crate) use ffi::RequestTaskMsg;
pub(crate) mod url_policy;
pub(crate) mod task_id_generator;
}
use ylong_runtime::sync::oneshot::Receiver;
use ylong_runtime::task::JoinHandle;
@ -96,6 +99,7 @@ pub(crate) fn runtime_spawn<F: Future<Output = ()> + Send + Sync + 'static>(
))
}
#[cfg(feature = "oh")]
pub(crate) fn query_app_state(uid: u64) -> ApplicationState {
let top_uid = query_top_uid();
match top_uid {
@ -110,6 +114,7 @@ pub(crate) fn query_app_state(uid: u64) -> ApplicationState {
}
}
#[cfg(feature = "oh")]
fn query_top_uid() -> Option<u64> {
let mut uid = 0;
for i in 0..10 {
@ -126,21 +131,25 @@ fn query_top_uid() -> Option<u64> {
None
}
#[cfg(feature = "oh")]
pub(crate) fn query_calling_bundle() -> String {
let token_id = ipc::Skeleton::calling_full_token_id();
ffi::GetCallingBundle(token_id)
}
#[cfg(feature = "oh")]
pub(crate) fn is_system_api() -> bool {
let token_id = ipc::Skeleton::calling_full_token_id();
ffi::IsSystemAPI(token_id)
}
#[cfg(feature = "oh")]
pub(crate) fn check_permission(permission: &str) -> bool {
let token_id = ipc::Skeleton::calling_full_token_id();
ffi::CheckPermission(token_id, permission)
}
#[cfg(feature = "oh")]
pub(crate) fn publish_state_change_event(
bundle_name: &str,
task_id: u32,
@ -152,6 +161,7 @@ pub(crate) fn publish_state_change_event(
}
}
#[cfg(feature = "oh")]
pub(crate) fn request_background_notify(
msg: RequestTaskMsg,
wrapped_path: &str,
@ -163,17 +173,14 @@ pub(crate) fn request_background_notify(
code => Err(code),
}
}
#[cxx::bridge(namespace = "OHOS::Request")]
mod ffi {
pub(crate) struct RequestTaskMsg {
struct RequestTaskMsg {
pub(crate) task_id: u32,
pub(crate) uid: i32,
pub(crate) action: u8,
}
unsafe extern "C++" {}
unsafe extern "C++" {
include!("request_utils.h");
@ -193,6 +200,7 @@ mod ffi {
}
}
#[cfg(feature = "oh")]
#[cfg(test)]
mod test {
use super::*;

View File

@ -21,7 +21,6 @@ pub(crate) fn check_url_domain(app_id: &str, domain_type: &str, url: &str) -> Op
}
}
#[link(name = "download_server_cxx", kind = "static")]
extern "C" {
pub(crate) fn PolicyCheckUrlDomain(
app_id: CStringWrapper,

View File

@ -44,7 +44,10 @@ ohos_rust_unittest("rust_request_ut_test") {
ohos_rust_unittest("rust_request_sdv_test") {
module_out_path = "request/request_rust"
rustflags = [ "--cfg=gn_test" ]
rustflags = [
"--cfg=gn_test",
"--cfg=feature=\"oh\"",
]
sources = [ "entry.rs" ]

View File

@ -11,6 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![cfg(feature = "oh")]
use download_server::config::ConfigBuilder;
use test_common::test_init;
#[test]

View File

@ -11,8 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![cfg(gn_test)]
mod construct;
mod resume;
mod search;
mod start;

53
services/tests/resume.rs Normal file
View File

@ -0,0 +1,53 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![cfg(feature = "oh")]
use std::fs::File;
use std::time::Duration;
use download_server::config::{Action, ConfigBuilder, Mode};
use test_common::test_init;
#[test]
fn sdv_start_resume() {
let file_path = "sdv_start_resume.txt";
let agent = test_init();
let file = File::create(file_path).unwrap();
let config = ConfigBuilder::new()
.action(Action::Download)
.mode(Mode::BackGround)
.file_spec(file)
.url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
.redirect(true)
.build();
let task_id = agent.construct(config);
agent.start(task_id);
agent.subscribe(task_id);
agent.pause(task_id);
std::thread::sleep(std::time::Duration::from_secs(1));
agent.resume(task_id);
ylong_runtime::block_on(async {
'main: loop {
let messages = agent.pop_task_info(task_id);
for message in messages {
message.check_correct();
if message.is_finished() {
break 'main;
}
}
ylong_runtime::time::sleep(Duration::from_secs(1)).await;
}
let file = File::open(file_path).unwrap();
assert_eq!(1042003, file.metadata().unwrap().len());
})
}

View File

@ -10,7 +10,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![cfg(feature = "oh")]
use std::time::{SystemTime, UNIX_EPOCH};
use download_server::config::{Action, ConfigBuilder, Mode};

View File

@ -10,22 +10,23 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![cfg(feature = "oh")]
use std::fs::File;
use std::time::Duration;
use download_server::config::{Action, ConfigBuilder, Mode};
use download_server::FileSpec;
use test_common::test_init;
#[test]
fn sdv_start_basic() {
let file_path = "sdv_network_resume.txt";
let agent = test_init();
let file = File::create("sdv_network_resume.txt").unwrap();
let file = File::create(file_path).unwrap();
let config = ConfigBuilder::new()
.action(Action::Download)
.mode(Mode::BackGround)
.file_spec(|v| v.push(FileSpec::user_file(&file)))
.file_spec(file)
.url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
.redirect(true)
.build();
@ -43,6 +44,7 @@ fn sdv_start_basic() {
}
ylong_runtime::time::sleep(Duration::from_secs(1)).await;
}
let file = File::open(file_path).unwrap();
assert_eq!(1042003, file.metadata().unwrap().len());
})
}

View File

@ -21,7 +21,7 @@ ohos_rust_static_library("rust_request_test_common") {
"./c:request_test",
"//third_party/rust/crates/once_cell:lib",
]
features = [ "oh" ]
external_deps = [
"ipc:ipc_rust",
"safwk:system_ability_fwk_rust",

View File

@ -16,11 +16,20 @@ name = "test_common"
version = "0.1.0"
edition = "2021"
[dependencies]
[features]
default = []
oh = [
"samgr",
"ipc",
"system_ability_fwk",
"download_server",
]
[dependencies]
ylong_runtime = { git = "https://gitee.com/openharmony/commonlibrary_rust_ylong_runtime", features = ["full"] }
download_server = { path = "../../services/" }
system_ability_fwk = { git = "https://gitee.com/openharmony/systemabilitymgr_safwk" }
samgr = { git = "https://gitee.com/openharmony/systemabilitymgr_samgr" }
ipc = { git = "https://gitee.com/openharmony/communication_ipc" }
download_server = { path = "../../services/", optional = true }
system_ability_fwk = { git = "https://gitee.com/openharmony/systemabilitymgr_safwk", optional = true }
samgr = { git = "https://gitee.com/openharmony/systemabilitymgr_samgr", optional = true }
ipc = { git = "https://gitee.com/openharmony/communication_ipc", optional = true }
once_cell = "1.17.0"

View File

@ -25,6 +25,7 @@
// limitations under the License.
#![allow(unused)]
#![allow(missing_docs)]
#![cfg(feature = "oh")]
use std::collections::HashMap;
use std::ffi::{c_char, CString};
@ -122,6 +123,31 @@ impl RequestAgent {
assert_eq!(ret, 0);
}
pub fn pause(&self, task_id: u32) {
let mut data = MsgParcel::new();
data.write_interface_token(SERVICE_TOKEN).unwrap();
data.write(&0u32);
data.write(&format!("{}", task_id)).unwrap();
let mut reply = self
.remote
.send_request(interface::PAUSE, &mut data)
.unwrap();
let ret: i32 = reply.read().unwrap();
assert_eq!(ret, 0);
}
pub fn resume(&self, task_id: u32) {
let mut data = MsgParcel::new();
data.write_interface_token(SERVICE_TOKEN).unwrap();
data.write(&format!("{}", task_id)).unwrap();
let mut reply = self
.remote
.send_request(interface::RESUME, &mut data)
.unwrap();
let ret: i32 = reply.read().unwrap();
assert_eq!(ret, 0);
}
pub fn search(
&self,
before: i64,