predownload 功能初步开发

Signed-off-by: fqwert <yanglv2@huawei.com>
Change-Id: I0491aeb84062168cdcd1a0a6ee928b45266d04f9
This commit is contained in:
fqwert 2024-10-31 17:30:08 +08:00
parent 376413f705
commit 5e070bd885
47 changed files with 2066 additions and 582 deletions

View File

@ -11,4 +11,33 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
*/
#ifndef REQUEST_PRE_DOWNLOAD_H
#define REQUEST_PRE_DOWNLOAD_H
#include <memory>
namespace OHOS::Request {
struct DownloadAgent;
class PreDownloadCallback {
public:
PreDownloadCallback() = default;
virtual ~PreDownloadCallback();
virtual void OnSuccess() const = 0;
virtual void OnFail() const = 0;
virtual void OnCancel() const = 0;
};
class PreDownloadAgent {
public:
PreDownloadAgent();
private:
static DownloadAgent &_agent;
};
} // namespace OHOS::Request
#endif // REQUEST_PRE_DOWNLOAD_H

View File

@ -0,0 +1,89 @@
# Copyright (C) 2024 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import("//build/ohos.gni")
import("//build/test.gni")
rust_cxx("ffrt_rs_cxx_gen") {
sources = [ "src/wrapper.rs" ]
}
ohos_static_library("ffrt_rs_cxx") {
sanitize = {
integer_overflow = true
ubsan = true
boundary_sanitize = true
cfi = true
cfi_cross_dso = true
debug = false
}
stack_protector_ret = true
include_dirs = [
"include",
"${target_gen_dir}/src",
]
sources = [ "src/cxx/wrapper.cpp" ]
sources += get_target_outputs(":ffrt_rs_cxx_gen")
external_deps = [ "ffrt:libffrt" ]
deps = [
":ffrt_rs_cxx_gen",
"//third_party/rust/crates/cxx:cxx_cppdeps",
]
subsystem_name = "request"
part_name = "request"
}
ohos_rust_static_library("ffrt_rs") {
sanitize = {
integer_overflow = true
ubsan = true
boundary_sanitize = true
cfi = true
cfi_cross_dso = true
debug = false
}
sources = [ "src/lib.rs" ]
external_deps = [ "ffrt:libffrt" ]
deps = [
":ffrt_rs_cxx",
"//third_party/rust/crates/cxx:lib",
]
subsystem_name = "request"
part_name = "request"
}
ohos_rust_unittest("rust_request_ffrt_rs_ut_test") {
module_out_path = "request/predownload"
sources = [ "src/lib.rs" ]
external_deps = []
deps = [
":ffrt_rs_cxx",
"//third_party/rust/crates/cxx:lib",
]
external_deps = [ "ffrt:libffrt" ]
subsystem_name = "request"
part_name = "request"
}

View File

@ -0,0 +1,21 @@
# Copyright (C) 2024 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[package]
name = "ffrt_rs"
version = "0.1.0"
edition = "2021"
[dependencies]
cxx = "1.0.115"
request_utils = { path = "../../request_utils" }

View File

@ -0,0 +1,27 @@
/*
* Copyright (C) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef REQUEST_FFRT_WRAPPER_H
#define REQUEST_FFRT_WRAPPER_H
#include "cpp/task.h"
#include "cxx.h"
#include "ffrt.h"
struct ClosureWrapper;
void FfrtSpawn(rust::Box<ClosureWrapper> closure);
#endif

View File

@ -0,0 +1,27 @@
/*
* Copyright (C) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "wrapper.h"
#include "cxx.h"
#include "wrapper.rs.h"
void FfrtSpawn(rust::Box<ClosureWrapper> closure)
{
ffrt::submit([closure = closure.into_raw()]() mutable {
closure->run();
rust::Box<ClosureWrapper>::from_raw(closure);
});
}

View File

@ -0,0 +1,44 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(missing_docs)]
#![allow(unused)]
mod wrapper;
use wrapper::{ClosureWrapper, FfrtSpawn};
pub fn ffrt_spawn<F>(f: F)
where
F: FnOnce() + 'static,
{
FfrtSpawn(ClosureWrapper::new(f));
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use super::*;
#[test]
fn test_spawn() {
let flag = Arc::new(AtomicUsize::new(0));
let flag_clone = flag.clone();
ffrt_spawn(move || {
flag_clone.fetch_add(1, Ordering::SeqCst);
});
std::thread::sleep(std::time::Duration::from_millis(100));
assert_eq!(flag.load(Ordering::SeqCst), 1);
}
}

View File

@ -0,0 +1,50 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
pub(crate) use ffi::FfrtSpawn;
pub struct ClosureWrapper {
inner: Option<Box<dyn FnOnce()>>,
}
impl ClosureWrapper {
pub fn new<F>(f: F) -> Box<Self>
where
F: FnOnce() + 'static,
{
Box::new(Self {
inner: Some(Box::new(f)),
})
}
pub fn run(&mut self) {
if let Some(f) = self.inner.take() {
f();
}
}
}
#[cxx::bridge]
mod ffi {
extern "Rust" {
type ClosureWrapper;
fn run(self: &mut ClosureWrapper);
}
unsafe extern "C++" {
include!("wrapper.h");
fn FfrtSpawn(closure: Box<ClosureWrapper>);
}
}

View File

@ -13,7 +13,7 @@
import("//build/ohos.gni")
ohos_shared_library("request_predownload") {
ohos_shared_library("predownload") {
sanitize = {
integer_overflow = true
ubsan = true
@ -26,12 +26,30 @@ ohos_shared_library("request_predownload") {
include_dirs = [ "include" ]
sources = [ "src/a.cpp" ]
sources = [
"src/pre_download_module.cpp",
"src/predownload.cpp",
]
deps = [
"../../native:request_predownload_native",
"../../native:predownload_native",
"//third_party/rust/crates/cxx:cxx_cppdeps",
]
external_deps = [
"ability_base:zuri",
"ability_runtime:abilitykit_native",
"ability_runtime:app_context",
"ability_runtime:data_ability_helper",
"ability_runtime:extensionkit_native",
"ability_runtime:napi_base_context",
"c_utils:utils",
"curl:curl_shared",
"hilog:libhilog",
"init:libbegetutil",
"napi:ace_napi",
]
relative_install_dir = "module"
subsystem_name = "request"
part_name = "request"
}

View File

@ -0,0 +1,24 @@
/*
* Copyright (C) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef REQUEST_PRE_DOWNLOAD_H
#define REQUEST_PRE_DOWNLOAD_H
#include "base/request/request/common/include/log.h"
#include "napi/native_api.h"
#include "napi/native_common.h"
#include "napi/native_node_api.h"
#endif

View File

@ -13,10 +13,13 @@
* limitations under the License.
*/
#include "a.h"
#ifndef REQUEST_PRE_DOWNLOAD_H
#define REQUEST_PRE_DOWNLOAD_H
#include "download/wrapper.rs.h"
#include "pre_download.h"
namespace OHOS::Request {
} // namespace OHOS::Request
} // namespace OHOS::Request
#endif // REQUEST_PRE_DOWNLOAD_H

View File

@ -0,0 +1,54 @@
/*
* Copyright (C) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "pre_download_module.h"
napi_value preDownload(napi_env env, napi_callback_info info)
{
size_t argc = 1;
napi_value url[1] = { nullptr };
NAPI_CALL(env, napi_get_cb_info(env, info, &argc, url, nullptr, nullptr));
napi_valuetype valuetype0;
NAPI_CALL(env, napi_typeof(env, url[0], &valuetype0));
if (valuetype0 != napi_string) {
REQUEST_HILOGE("ERROR");
return nullptr;
}
return nullptr;
}
static napi_value Init(napi_env env, napi_value exports)
{
napi_property_descriptor desc[]{
DECLARE_NAPI_METHOD("preDownload", preDownload),
};
napi_define_properties(env, exports, sizeof(desc) / sizeof(napi_property_descriptor), desc);
return exports;
}
static __attribute__((constructor)) void RegisterModule()
{
static napi_module module = { .nm_version = 1,
.nm_flags = 0,
.nm_filename = nullptr,
.nm_register_func = Init,
.nm_modname = "predownload",
.nm_priv = ((void *)0),
.reserved = { 0 } };
napi_module_register(&module);
}

View File

@ -13,9 +13,3 @@
* limitations under the License.
*/
#include "pre_download.h"
#include "download/wrapper.rs.h"
namespace OHOS::Request {
} // namespace OHOS::Request

View File

@ -13,11 +13,11 @@
import("//build/ohos.gni")
import("//build/test.gni")
rust_cxx("request_predownload_native_rust_gen") {
sources = [ "src/download/wrapper.rs" ]
rust_cxx("predownload_native_rust_gen") {
sources = [ "src/wrapper.rs" ]
}
ohos_rust_static_ffi("request_predownload_native_rust") {
ohos_rust_static_ffi("predownload_native_rust") {
sanitize = {
integer_overflow = true
ubsan = true
@ -31,6 +31,7 @@ ohos_rust_static_ffi("request_predownload_native_rust") {
sources = [ "src/lib.rs" ]
deps = [
"../../request_utils:request_utils",
"../ffrt_rs:ffrt_rs",
"../netstack_rs:netstack_rs",
"//third_party/rust/crates/cxx:lib",
]
@ -40,7 +41,7 @@ ohos_rust_static_ffi("request_predownload_native_rust") {
part_name = "request"
}
config("request_predownload_native_config") {
config("predownload_native_config") {
include_dirs = [
"include",
"${target_gen_dir}/src",
@ -48,7 +49,7 @@ config("request_predownload_native_config") {
]
}
ohos_shared_library("request_predownload_native") {
ohos_shared_library("predownload_native") {
sanitize = {
integer_overflow = true
ubsan = true
@ -59,21 +60,23 @@ ohos_shared_library("request_predownload_native") {
}
stack_protector_ret = true
public_configs = [ ":request_predownload_native_config" ]
public_configs = [ ":predownload_native_config" ]
sources = [ "src/cxx/pre_download.cpp" ]
sources += get_target_outputs(":request_predownload_native_rust_gen")
sources = [ "src/cxx/request_pre_download.cpp" ]
sources += get_target_outputs(":predownload_native_rust_gen")
deps = [
":request_predownload_native_rust",
":request_predownload_native_rust_gen",
":predownload_native_rust",
":predownload_native_rust_gen",
"//third_party/rust/crates/cxx:cxx_cppdeps",
]
innerapi_tags = [ "platformsdk" ]
subsystem_name = "request"
part_name = "request"
}
ohos_static_library("request_predownload_native_static") {
ohos_static_library("predownload_native_static") {
sanitize = {
integer_overflow = true
ubsan = true
@ -84,12 +87,12 @@ ohos_static_library("request_predownload_native_static") {
}
stack_protector_ret = true
public_configs = [ ":request_predownload_native_config" ]
public_configs = [ ":predownload_native_config" ]
sources = [ "src/cxx/pre_download.cpp" ]
sources += get_target_outputs(":request_predownload_native_rust_gen")
sources = [ "src/cxx/request_pre_download.cpp" ]
sources += get_target_outputs(":predownload_native_rust_gen")
deps = [
":request_predownload_native_rust_gen",
":predownload_native_rust_gen",
"//third_party/rust/crates/cxx:cxx_cppdeps",
]
@ -97,7 +100,7 @@ ohos_static_library("request_predownload_native_static") {
part_name = "request"
}
ohos_rust_unittest("rust_request_predownload_native_ut_test") {
ohos_rust_unittest("rust_predownload_native_ut_test") {
module_out_path = "request/predownload"
sources = [ "src/lib.rs" ]
@ -107,8 +110,9 @@ ohos_rust_unittest("rust_request_predownload_native_ut_test") {
rustflags = [ "--cfg=feature=\"ohos\"" ]
deps = [
":request_predownload_native_static",
":predownload_native_static",
"../../request_utils:request_utils",
"../ffrt_rs:ffrt_rs",
"../netstack_rs:netstack_rs",
"//third_party/rust/crates/cxx:lib",
]

View File

@ -12,19 +12,29 @@
# limitations under the License.
[package]
name = "request_predownload_native"
name = "predownload_native"
version = "0.1.0"
edition = "2021"
[features]
default = ["ohos"]
default = []
ohos = [
"netstack_rs",
"request_utils",
"request_utils/ohos",
"ffrt_rs",
]
[dependencies]
cxx = "1.0.115"
ylong_runtime = { git = "https://gitee.com/openharmony/commonlibrary_rust_ylong_runtime", features = ["full"] }
ylong_http_client = { git = "https://gitee.com/openharmony/commonlibrary_rust_ylong_http", features = [
"async",
"c_openssl_3_0",
"http1_1",
"ylong_base",
] }
request_utils = { path = "../../request_utils" }
ffrt_rs = { path = "../ffrt_rs", optional = true }
netstack_rs = { path = "../netstack_rs", optional = true }
request_utils = { path = "../../request_utils", optional = true }
env_logger = "0.11.3"

View File

@ -21,20 +21,26 @@
#include "cxx.h"
namespace OHOS::Request {
struct TaskHandle;
struct PreDownloadOptions;
struct DownloadAgent;
class PreDownloadTaskCallback {
class PreDownloadCallback {
public:
PreDownloadTaskCallback() = default;
virtual ~PreDownloadTaskCallback();
PreDownloadCallback() = default;
virtual ~PreDownloadCallback();
virtual void OnSuccess() const = 0;
virtual void OnFail() const = 0;
virtual void OnCancel() const = 0;
};
class PreDownloadAgent {
public:
PreDownloadAgent();
void preDownload(std::string url, std::unique_ptr<PreDownloadCallback> callback) const;
private:
DownloadAgent *_agent;
};
} // namespace OHOS::Request
#endif // REQUEST_PRE_DOWNLOAD_H

View File

@ -0,0 +1,311 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::{Arc, LazyLock, Mutex};
use request_utils::fastrand::fast_random;
use crate::cache::{Cache, CacheManager, Fetcher, Updater};
use crate::download::CancelHandle;
cfg_ohos! {
use crate::wrapper::ffi::PreDownloadCallback;
use crate::wrapper::FfiCallback;
}
#[allow(unused_variables)]
pub trait CustomCallback: Send {
fn on_success(&mut self, data: Arc<Cache>) {}
fn on_fail(&mut self, error: &str) {}
fn on_cancel(&mut self) {}
}
pub struct DownloadAgent {
tasks: Mutex<HashMap<String, u64>>,
running_tasks: Mutex<HashMap<u64, Updater>>,
}
impl DownloadAgent {
fn new() -> Self {
Self {
tasks: Mutex::new(HashMap::new()),
running_tasks: Mutex::new(HashMap::new()),
}
}
pub fn get_instance() -> &'static Self {
static CACHE_MANAGER: LazyLock<DownloadAgent> = LazyLock::new(DownloadAgent::new);
&CACHE_MANAGER
}
pub fn cancel(&self, url: String) {
if let Some(task_id) = self.tasks.lock().unwrap().get(&url) {
if let Some(updater) = self.running_tasks.lock().unwrap().get_mut(task_id) {
updater.cancel();
}
}
}
pub fn remove(&self, url: String) {
if let Some(task_id) = self.tasks.lock().unwrap().remove(&url) {
self.running_tasks.lock().unwrap().remove(&task_id);
}
}
pub fn pre_download(
&self,
url: String,
mut callback: Box<dyn CustomCallback>,
update: bool,
) -> Option<CancelHandle> {
let mut tasks = self.tasks.lock().unwrap();
let mut running_tasks = self.running_tasks.lock().unwrap();
if let Some(task_id) = tasks.get(&url) {
info!("task {} exist", task_id);
if let Some(updater) = running_tasks.get_mut(task_id) {
if let Err(ret) = updater.try_add_callback(callback) {
info!("task {} completed", task_id);
callback = ret;
} else {
info!("task {} add callback success", task_id);
return Some(updater.cancel_handle());
}
}
if !update {
if let Err(ret) = self.fetch(task_id, callback) {
error!("{} fetch fail", task_id);
callback = ret;
} else {
info!("{} fetch success", task_id);
return None;
}
}
}
let task_id = fast_random();
info!("new pre_download task {}", task_id);
let updater = Updater::new(task_id, &url, callback);
let handle = updater.cancel_handle();
tasks.insert(url, task_id);
running_tasks.insert(task_id, updater);
Some(handle)
}
#[cfg(feature = "ohos")]
pub(crate) fn ffi_pre_download(
&self,
url: String,
callback: cxx::UniquePtr<PreDownloadCallback>,
update: bool,
) {
let Some(callback) = FfiCallback::from_ffi(callback) else {
error!("ffi_pre_download callback is null");
return;
};
let _ = self.pre_download(url, Box::new(callback), update);
}
fn fetch(
&self,
task_id: &u64,
callback: Box<dyn CustomCallback>,
) -> Result<(), Box<dyn CustomCallback>> {
let fetcher = Fetcher::new(*task_id);
fetcher.fetch_with_callback(callback)
}
}
#[cfg(test)]
mod test {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use super::*;
use crate::{init, TEST_URL};
const ERROR_IP: &str = "127.12.31.12";
struct TestCallbackN;
impl CustomCallback for TestCallbackN {}
struct TestCallbackS {
flag: Arc<AtomicUsize>,
}
impl CustomCallback for TestCallbackS {
fn on_success(&mut self, data: Arc<Cache>) {
if data.size() != 0 {
self.flag.fetch_add(1, Ordering::SeqCst);
}
}
}
struct TestCallbackF {
flag: Arc<Mutex<String>>,
}
impl CustomCallback for TestCallbackF {
fn on_fail(&mut self, error: &str) {
self.flag.lock().unwrap().push_str(error);
}
}
struct TestCallbackC {
flag: Arc<AtomicUsize>,
}
impl CustomCallback for TestCallbackC {
fn on_cancel(&mut self) {
self.flag.fetch_add(1, Ordering::SeqCst);
}
}
#[test]
fn ut_pre_download_success() {
init();
let agent = DownloadAgent::new();
let success_flag = Arc::new(AtomicUsize::new(0));
let callback = Box::new(TestCallbackS {
flag: success_flag.clone(),
});
agent.pre_download(TEST_URL.to_string(), callback, false);
std::thread::sleep(Duration::from_secs(1));
assert_eq!(success_flag.load(Ordering::SeqCst), 1);
}
#[test]
fn ut_pre_download_success_add_callback() {
init();
let agent = DownloadAgent::new();
let success_flag_0 = Arc::new(AtomicUsize::new(0));
let callback_0 = Box::new(TestCallbackS {
flag: success_flag_0.clone(),
});
let success_flag_1 = Arc::new(AtomicUsize::new(0));
let callback_1 = Box::new(TestCallbackS {
flag: success_flag_1.clone(),
});
agent.pre_download(TEST_URL.to_string(), callback_0, false);
agent.pre_download(TEST_URL.to_string(), callback_1, false);
std::thread::sleep(Duration::from_secs(1));
assert_eq!(success_flag_0.load(Ordering::SeqCst), 1);
assert_eq!(success_flag_1.load(Ordering::SeqCst), 1);
}
#[test]
fn ut_pre_download_fail() {
init();
let agent = DownloadAgent::new();
let error = Arc::new(Mutex::new(String::new()));
let callback = Box::new(TestCallbackF {
flag: error.clone(),
});
agent.pre_download(ERROR_IP.to_string(), callback, false);
std::thread::sleep(Duration::from_secs(1));
assert!(!error.lock().unwrap().as_str().is_empty());
}
#[test]
fn ut_pre_download_fail_add_callback() {
init();
let agent = DownloadAgent::new();
let error_0 = Arc::new(Mutex::new(String::new()));
let callback_0 = Box::new(TestCallbackF {
flag: error_0.clone(),
});
let error_1 = Arc::new(Mutex::new(String::new()));
let callback_1 = Box::new(TestCallbackF {
flag: error_1.clone(),
});
agent.pre_download(ERROR_IP.to_string(), callback_0, false);
agent.pre_download(ERROR_IP.to_string(), callback_1, false);
std::thread::sleep(Duration::from_secs(1));
assert!(!error_0.lock().unwrap().as_str().is_empty());
assert!(!error_1.lock().unwrap().as_str().is_empty());
}
#[test]
fn ut_pre_download_cancel() {
init();
let agent = DownloadAgent::new();
let cancel_flag = Arc::new(AtomicUsize::new(0));
let callback = Box::new(TestCallbackC {
flag: cancel_flag.clone(),
});
let mut handle = agent
.pre_download(TEST_URL.to_string(), callback, false)
.unwrap();
handle.cancel();
std::thread::sleep(Duration::from_secs(1));
assert_eq!(cancel_flag.load(Ordering::SeqCst), 1);
let cancel_flag = Arc::new(AtomicUsize::new(0));
let callback = Box::new(TestCallbackC {
flag: cancel_flag.clone(),
});
agent
.pre_download(TEST_URL.to_string(), callback, false)
.unwrap();
agent.cancel(TEST_URL.to_string());
std::thread::sleep(Duration::from_secs(1));
assert_eq!(cancel_flag.load(Ordering::SeqCst), 1);
}
#[test]
fn ut_pre_download_cancel_add_callback() {
init();
let agent = DownloadAgent::new();
let cancel_flag_0 = Arc::new(AtomicUsize::new(0));
let callback_0 = Box::new(TestCallbackC {
flag: cancel_flag_0.clone(),
});
let cancel_flag_1 = Arc::new(AtomicUsize::new(0));
let callback_1 = Box::new(TestCallbackC {
flag: cancel_flag_1.clone(),
});
let mut handle_0 = agent
.pre_download(TEST_URL.to_string(), callback_0, false)
.unwrap();
agent
.pre_download(TEST_URL.to_string(), callback_1, false)
.unwrap();
handle_0.cancel();
std::thread::sleep(Duration::from_secs(1));
assert_eq!(cancel_flag_0.load(Ordering::SeqCst), 1);
assert_eq!(cancel_flag_1.load(Ordering::SeqCst), 1);
}
#[test]
fn ut_pre_download_already_success() {
init();
let agent = DownloadAgent::new();
agent.pre_download(TEST_URL.to_string(), Box::new(TestCallbackN), false);
std::thread::sleep(Duration::from_secs(1));
let success_flag = Arc::new(AtomicUsize::new(0));
let callback = Box::new(TestCallbackS {
flag: success_flag.clone(),
});
agent.pre_download(TEST_URL.to_string(), callback, false);
std::thread::sleep(Duration::from_millis(500));
assert_eq!(success_flag.load(Ordering::SeqCst), 1);
}
}

View File

@ -13,179 +13,89 @@
use std::fs::{File, OpenOptions};
use std::io::{self, Cursor, Read, Seek, Write};
use std::sync::Arc;
pub(crate) struct Cache {
data: CacheData,
known_size: bool,
size: Option<usize>,
}
use super::CacheManager;
enum CacheData {
Ram(Vec<u8>),
File(File),
pub struct Cache {
task_id: u64,
data: Vec<u8>,
ram_applied: bool,
}
impl Cache {
pub(super) fn new_ram(size: Option<usize>) -> Self {
if let Some(size) = size {
Self {
data: CacheData::Ram(Vec::with_capacity(size)),
known_size: true,
size: Some(size),
}
} else {
Self {
data: CacheData::Ram(Vec::new()),
known_size: false,
size: None,
}
pub(super) fn new(task_id: u64, applied_size: Option<usize>) -> Self {
let (data, ram_applied) = match applied_size {
Some(size) => (Vec::with_capacity(size), true),
None => (Vec::new(), false),
};
Self {
task_id,
data,
ram_applied,
}
}
pub(crate) fn reader(&self) -> CacheReader {
match &self.data {
CacheData::Ram(v) => CacheReader {
inner: ReaderData::Cursor(Cursor::new(v)),
},
CacheData::File(f) => {
let mut file = f.try_clone().unwrap();
file.rewind().unwrap();
CacheReader {
inner: ReaderData::File(file),
}
}
pub(crate) fn cursor(&self) -> Cursor<&[u8]> {
Cursor::new(&self.data)
}
pub(crate) fn complete_write(self) -> Arc<Self> {
if !self.ram_applied {
CacheManager::get_instance().apply_ram_size(self.data.len());
}
let me = Arc::new(self);
CacheManager::get_instance().update_cache(me.task_id, me.clone());
me
}
pub(crate) fn update_cache_size(&mut self) -> bool {
if self.size.is_none() {
self.size = Some(self.data.size());
true
} else {
false
}
}
pub(super) fn turn_to_file(&mut self) -> bool {
todo!()
}
pub(super) fn is_ram(&self) -> bool {
match &self.data {
CacheData::Ram(_) => true,
CacheData::File(_) => false,
}
}
pub(super) fn is_valid(&self) -> bool {
if let Some(size) = self.size {
return size == self.data.size();
}
false
}
pub(super) fn size(&self) -> Option<usize> {
self.size
}
pub(super) fn known_size(&self) -> bool {
self.known_size
}
pub(super) fn create_file_cache(&self, path: &str) -> Result<Cache, io::Error> {
pub(super) fn create_file_cache(&self, task_id: u64) -> Result<File, io::Error> {
let mut file = OpenOptions::new()
.write(true)
.read(true)
.create(true)
.truncate(true)
.open(path)?;
io::copy(&mut self.reader(), &mut file);
.open(task_id.to_string())?;
io::copy(&mut self.cursor(), &mut file)?;
file.rewind()?;
Ok(Cache {
data: CacheData::File(file),
known_size: self.known_size,
size: self.size,
})
Ok(file)
}
}
impl CacheData {
fn size(&self) -> usize {
match self {
CacheData::Ram(v) => v.len(),
CacheData::File(f) => f.metadata().unwrap().len() as usize,
}
pub(crate) fn size(&self) -> usize {
self.data.len()
}
}
impl Write for Cache {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
match self.data {
CacheData::Ram(ref mut v) => v.write(buf),
CacheData::File(ref mut f) => f.write(buf),
}
self.data.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
match self.data {
CacheData::Ram(ref mut v) => v.flush(),
CacheData::File(ref mut f) => f.flush(),
}
}
}
pub(crate) struct CacheReader<'a> {
inner: ReaderData<'a>,
}
enum ReaderData<'a> {
Cursor(Cursor<&'a [u8]>),
File(File),
}
impl<'a> Read for CacheReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match &mut self.inner {
ReaderData::Cursor(c) => c.read(buf),
ReaderData::File(f) => f.read(buf),
}
}
}
impl<'a> Seek for CacheReader<'a> {
fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
match &mut self.inner {
ReaderData::Cursor(c) => c.seek(pos),
ReaderData::File(f) => f.seek(pos),
}
self.data.flush()
}
}
#[cfg(test)]
mod test {
use std::fs::read;
use request_utils::fastrand::fast_random;
use super::*;
const TEST_URL: &str = "小心猴子";
const TEST_STRING: &str = "你这猴子真让我欢喜";
#[test]
fn ut_cache_write_read() {
let mut cache = Cache::new_ram(Some(TEST_STRING.len()));
let task_id = fast_random();
let mut cache = Cache::new(task_id, Some(TEST_STRING.len()));
cache.write_all(TEST_STRING.as_bytes()).unwrap();
let mut buf = String::new();
cache.reader().read_to_string(&mut buf);
cache.cursor().read_to_string(&mut buf).unwrap();
assert_eq!(buf, TEST_STRING);
let mut buf = String::new();
cache.reader().read_to_string(&mut buf);
assert_eq!(buf, TEST_STRING);
let new_cache = cache.create_file_cache(TEST_URL).unwrap();
assert!(new_cache.is_valid());
let mut buf = String::new();
new_cache.reader().read_to_string(&mut buf).unwrap();
cache.cursor().read_to_string(&mut buf).unwrap();
assert_eq!(buf, TEST_STRING);
}
}

View File

@ -11,10 +11,33 @@
// See the License for the specific language governing permissions and
// limitations under the License.
struct Fetcher {}
use super::CacheManager;
use crate::agent::CustomCallback;
pub(crate) struct Fetcher {
task_id: u64,
}
impl Fetcher {
fn new() -> Self {
Self {}
pub(crate) fn new(task_id: u64) -> Self {
Self { task_id }
}
pub(crate) fn fetch_with_callback(
&self,
mut callback: Box<dyn CustomCallback>,
) -> Result<(), Box<dyn CustomCallback>> {
if let Some(cache) = CacheManager::get_instance().get_cache(self.task_id) {
#[cfg(feature = "ohos")]
ffrt_rs::ffrt_spawn(move || {
callback.on_success(cache);
});
#[cfg(not(feature = "ohos"))]
std::thread::spawn(move || {
callback.on_success(cache);
});
Ok(())
} else {
Err(callback)
}
}
}

View File

@ -11,33 +11,46 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::{BufRead, Cursor, Read, Seek, Write};
use std::io::BufReader;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, LazyLock, Mutex, Once, RwLock};
use std::thread;
use std::os::unix::fs::MetadataExt;
use std::sync::{Arc, LazyLock, Mutex, Once, OnceLock};
use std::time::Duration;
use std::{io, thread};
use request_utils::queue_map::QueueMap;
use super::data::Cache;
use crate::spawn;
const DEFAULT_RAM_CACHE_SIZE: usize = 1024 * 1024 * 100;
const DEFAULT_FILE_CACHE_SIZE: usize = 1024 * 1024 * 100;
pub(crate) struct CacheManager {
caches: Mutex<HashMap<String, Arc<Cache>>>,
ram_cache_queue: Mutex<VecDeque<String>>,
file_cache_queue: Mutex<VecDeque<String>>,
handler: Mutex<Handler>,
rams: Mutex<QueueMap<u64, Arc<Cache>>>,
backup_rams: Mutex<HashMap<u64, Arc<Cache>>>,
files: Mutex<QueueMap<u64, File>>,
ram_once: Mutex<HashMap<u64, Arc<OnceLock<Option<Arc<Cache>>>>>>,
ram_handle: Mutex<Handle>,
}
struct RamHandle {
ram_caches: Mutex<HashMap<u64, Arc<Cache>>>,
}
impl CacheManager {
fn new() -> Self {
Self {
caches: Mutex::new(HashMap::new()),
ram_cache_queue: Mutex::new(VecDeque::new()),
file_cache_queue: Mutex::new(VecDeque::new()),
handler: Mutex::new(Handler::new(DEFAULT_RAM_CACHE_SIZE)),
rams: Mutex::new(QueueMap::new()),
files: Mutex::new(QueueMap::new()),
backup_rams: Mutex::new(HashMap::new()),
ram_once: Mutex::new(HashMap::new()),
ram_handle: Mutex::new(Handle::new(DEFAULT_RAM_CACHE_SIZE)),
}
}
@ -46,88 +59,113 @@ impl CacheManager {
&CACHE_MANAGER
}
pub(crate) fn apply_for_cache(&self, size: Option<usize>) -> Cache {
if let Some(size) = size {
self.apply_ram_size(size);
pub(crate) fn apply_for_cache(
&self,
task_id: u64,
applied_size: Option<usize>,
) -> Result<Cache, ()> {
if let Some(size) = applied_size {
if !self.apply_ram_size(size) {
return Err(());
}
}
Cache::new_ram(size)
Ok(Cache::new(task_id, applied_size))
}
pub(crate) fn update_cache(&self, url: String, cache: Cache) -> bool {
if !cache.is_valid() {
return false;
}
let cache = Arc::new(cache);
if let Some(old_cache) = self
.caches
.lock()
.unwrap()
.insert(url.clone(), cache.clone())
{
pub(crate) fn update_cache(&self, task_id: u64, cache: Arc<Cache>) {
if let Some(old_cache) = self.rams.lock().unwrap().insert(task_id, cache.clone()) {
self.release_cache(old_cache);
}
info!("{} ram updated", task_id);
self.ram_once.lock().unwrap().remove(&task_id);
info!("{} ram once removed", task_id);
self.backup_rams
.lock()
.unwrap()
.insert(task_id, cache.clone());
info!("{} ram backup updated", task_id);
spawn(move || {
let file = cache.create_file_cache(task_id).unwrap();
CacheManager::get_instance().update_file(task_id, file);
});
}
if cache.is_ram() {
if !cache.known_size() {
let size = cache.size().unwrap();
self.apply_ram_size(size);
pub(crate) fn get_cache(&self, task_id: u64) -> Option<Arc<Cache>> {
let res = self.rams.lock().unwrap().get(&task_id).cloned();
res.or_else(|| self.backup_rams.lock().unwrap().get(&task_id).cloned())
.or_else(|| self.update_ram_from_file(task_id))
}
fn update_file(&self, task_id: u64, file: File) {
info!("{} file updated", task_id);
self.files.lock().unwrap().insert(task_id, file);
self.backup_rams.lock().unwrap().remove(&task_id);
}
fn update_ram_from_file(&self, task_id: u64) -> Option<Arc<Cache>> {
info!("{} ram updated from file", task_id);
let once = match self.ram_once.lock().unwrap().entry(task_id) {
Entry::Occupied(entry) => entry.into_mut().clone(),
Entry::Vacant(entry) => {
let res = self.rams.lock().unwrap().get(&task_id).cloned();
let res = res.or_else(|| self.backup_rams.lock().unwrap().get(&task_id).cloned());
if res.is_some() {
return res;
} else {
entry.insert(Arc::new(OnceLock::new())).clone()
}
}
self.ram_cache_queue.lock().unwrap().push_back(url);
} else {
self.file_cache_queue.lock().unwrap().push_back(url);
}
};
true
let mut ram_once = self.ram_once.lock().unwrap();
if !ram_once.contains_key(&task_id) {
let mut res = self.rams.lock().unwrap().get(&task_id).cloned();
let res = res.or_else(|| self.backup_rams.lock().unwrap().get(&task_id).cloned());
if res.is_some() {
return res;
}
}
let once = ram_once
.entry(task_id)
.or_insert(Arc::new(OnceLock::new()))
.clone();
drop(ram_once);
let once = self
.ram_once
.lock()
.unwrap()
.entry(task_id)
.or_insert(Arc::new(OnceLock::new()))
.clone();
once.get_or_init(|| {
let mut file = self.files.lock().unwrap().remove(&task_id)?;
let size = file.metadata().unwrap().size();
let mut cache = Cache::new(task_id, Some(size as usize));
io::copy(&mut file, &mut cache).unwrap();
Some(cache.complete_write())
})
.clone()
}
fn release_cache(&self, cache: Arc<Cache>) {
let size = cache.size().unwrap();
if cache.is_ram() {
self.handler.lock().unwrap().release(size);
}
self.ram_handle.lock().unwrap().release(cache.size());
}
pub(crate) fn get_cache(&self, url: String) -> Option<Arc<Cache>> {
self.caches.lock().unwrap().get(&url).cloned()
}
fn release_ram_cache(&self, release_size: usize) {
let mut released = 0;
while let Some((url, mut cache)) = self
.ram_cache_queue
.lock()
.unwrap()
.pop_front()
.and_then(|x| self.caches.lock().unwrap().get(&x).map(|c| (x, c.clone())))
{
let size = cache.size().unwrap();
released += size;
thread::spawn(move || {
let cache = cache.create_file_cache(&url).unwrap();
CacheManager::get_instance().update_cache(url, cache);
});
if released >= release_size {
break;
}
}
}
fn apply_ram_size(&self, apply_size: usize) {
let mut handler = self.handler.lock().unwrap();
handler.apply_ram_size(apply_size);
self.release_ram_cache(handler.need_release_size());
pub(super) fn apply_ram_size(&self, apply_size: usize) -> bool {
self.ram_handle.lock().unwrap().apply_ram_size(apply_size)
}
fn update_to_ram() {}
}
struct Handler {
struct Handle {
total_ram: usize,
used_ram: usize,
}
impl Handler {
impl Handle {
fn new(ram_cache_size: usize) -> Self {
Self {
total_ram: ram_cache_size,
@ -158,60 +196,139 @@ impl Handler {
#[cfg(test)]
mod test {
use std::io::{Read, Write};
use std::thread;
use std::time::Duration;
use request_utils::fastrand::fast_random;
use super::*;
use crate::cache;
const TEST_URL: &str = "小心猴子";
use crate::init;
const TEST_STRING: &str = "你这猴子真让我欢喜";
#[test]
fn ut_handler_size() {
let mut handler = Handler::new(DEFAULT_RAM_CACHE_SIZE);
assert!(!handler.apply_ram_size(DEFAULT_RAM_CACHE_SIZE + 1));
assert!(handler.apply_ram_size(1024));
assert_eq!(handler.need_release_size(), 0);
assert!(handler.apply_ram_size(DEFAULT_FILE_CACHE_SIZE));
assert_eq!(handler.need_release_size(), 1024);
handler.release(1024);
assert_eq!(handler.need_release_size(), 0);
handler.change_total_size(1024);
assert_eq!(handler.need_release_size(), DEFAULT_RAM_CACHE_SIZE - 1024);
fn ut_handle_size() {
let mut handle = Handle::new(DEFAULT_RAM_CACHE_SIZE);
assert!(!handle.apply_ram_size(DEFAULT_RAM_CACHE_SIZE + 1));
assert!(handle.apply_ram_size(1024));
assert_eq!(handle.need_release_size(), 0);
assert!(handle.apply_ram_size(DEFAULT_FILE_CACHE_SIZE));
assert_eq!(handle.need_release_size(), 1024);
handle.release(1024);
assert_eq!(handle.need_release_size(), 0);
handle.change_total_size(1024);
assert_eq!(handle.need_release_size(), DEFAULT_RAM_CACHE_SIZE - 1024);
}
#[test]
fn ut_cache_manager_basic() {
let cache_manager = CacheManager::new();
let mut cache = cache_manager.apply_for_cache(Some(TEST_STRING.len()));
let task_id = fast_random();
let cache_manager = CacheManager::get_instance();
let mut cache = cache_manager
.apply_for_cache(task_id, Some(TEST_STRING.len()))
.unwrap();
cache.write_all(TEST_STRING.as_bytes()).unwrap();
assert!(cache_manager.update_cache(TEST_URL.to_string(), cache));
let mut cache = cache_manager.get_cache(TEST_URL.to_string()).unwrap();
let cache = Arc::new(cache);
let mut buf = String::new();
cache.reader().read_to_string(&mut buf);
cache.cursor().read_to_string(&mut buf).unwrap();
assert_eq!(buf, TEST_STRING);
}
#[test]
fn ut_cache_manager_handler() {
let cache_manager = CacheManager::new();
assert_eq!(
cache_manager.handler.lock().unwrap().total_ram,
DEFAULT_RAM_CACHE_SIZE
);
let mut cache = cache_manager.apply_for_cache(Some(TEST_STRING.len()));
cache.write_all(TEST_STRING.as_bytes()).unwrap();
assert!(cache_manager.update_cache(TEST_URL.to_string(), cache));
assert_eq!(
cache_manager.handler.lock().unwrap().used_ram,
TEST_STRING.len()
);
fn ut_cache_manager_update() {
init();
let task_id = fast_random();
let cache_manager = CacheManager::get_instance();
let mut cache = cache_manager.apply_for_cache(Some(TEST_STRING.len()));
let mut cache = cache_manager
.apply_for_cache(task_id, Some(TEST_STRING.len()))
.unwrap();
cache.write_all(TEST_STRING.as_bytes()).unwrap();
assert!(cache_manager.update_cache(TEST_URL.to_string(), cache));
assert_eq!(
cache_manager.handler.lock().unwrap().used_ram,
TEST_STRING.len()
);
let cache = Arc::new(cache);
cache_manager.update_cache(task_id, cache);
let cache = cache_manager.get_cache(task_id).unwrap();
let mut buf = String::new();
cache.cursor().read_to_string(&mut buf).unwrap();
assert_eq!(buf, TEST_STRING);
}
#[test]
fn ut_cache_manager_file() {
init();
let task_id = fast_random();
let cache_manager = CacheManager::get_instance();
let mut cache = cache_manager
.apply_for_cache(task_id, Some(TEST_STRING.len()))
.unwrap();
cache.write_all(TEST_STRING.as_bytes()).unwrap();
let cache = Arc::new(cache);
cache_manager.update_cache(task_id, cache);
thread::sleep(Duration::from_millis(100));
let mut file = cache_manager
.files
.lock()
.unwrap()
.remove(&task_id)
.unwrap();
let mut buf = String::new();
file.read_to_string(&mut buf).unwrap();
assert_eq!(buf, TEST_STRING);
}
#[test]
fn ut_cache_manager_ram_backup() {
init();
let task_id = fast_random();
let cache_manager = CacheManager::get_instance();
let mut cache = cache_manager
.apply_for_cache(task_id, Some(TEST_STRING.len()))
.unwrap();
cache.write_all(TEST_STRING.as_bytes()).unwrap();
let cache = Arc::new(cache);
cache_manager.update_cache(task_id, cache);
assert!(cache_manager
.backup_rams
.lock()
.unwrap()
.contains_key(&task_id));
thread::sleep(Duration::from_millis(100));
assert!(!cache_manager
.backup_rams
.lock()
.unwrap()
.contains_key(&task_id));
}
#[test]
fn ut_cache_manager_cache_from_file() {
init();
let task_id = fast_random();
let cache_manager = CacheManager::get_instance();
let mut cache = cache_manager
.apply_for_cache(task_id, Some(TEST_STRING.len()))
.unwrap();
cache.write_all(TEST_STRING.as_bytes()).unwrap();
let cache = Arc::new(cache);
cache_manager.update_cache(task_id, cache);
thread::sleep(Duration::from_millis(100));
cache_manager.rams.lock().unwrap().remove(&task_id);
let mut v = vec![];
for _ in 0..1024 {
v.push(thread::spawn(move || {
let cache = CacheManager::get_instance().get_cache(task_id).unwrap();
let mut buf = String::new();
cache.cursor().read_to_string(&mut buf).unwrap();
buf == TEST_STRING
}));
}
for t in v {
assert!(t.join().unwrap());
}
}
}

View File

@ -17,4 +17,7 @@ mod manage;
mod update;
pub(crate) use data::Cache;
pub(crate) use fetch::Fetcher;
pub(crate) use manage::CacheManager;
pub(crate) use update::Updater;

View File

@ -9,4 +9,35 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// limitations under the License.
use crate::agent::CustomCallback;
use crate::download::{download, CancelHandle, TaskHandle};
pub(crate) struct Updater {
handle: TaskHandle,
}
impl Updater {
pub(crate) fn new(task_id: u64, url: &str, callback: Box<dyn CustomCallback>) -> Self {
let task_handle = download(task_id, url, Some(callback));
Self {
handle: task_handle,
}
}
pub(crate) fn try_add_callback(
&mut self,
callback: Box<dyn CustomCallback>,
) -> Result<(), Box<dyn CustomCallback>> {
self.handle.try_add_callback(callback)
}
pub(crate) fn cancel(&mut self) {
self.handle.cancel();
}
pub(crate) fn cancel_handle(&self) -> CancelHandle {
self.handle.cancel_handle()
}
}

View File

@ -13,8 +13,14 @@
* limitations under the License.
*/
#include "download/wrapper.rs.h"
#include "pre_download.h"
#include "request_pre_download.h"
#include "wrapper.rs.h"
namespace OHOS::Request {
PreDownloadAgent::PreDownloadAgent()
{
}
} // namespace OHOS::Request

View File

@ -0,0 +1,205 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::Write;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
pub(crate) use super::CancelHandle;
use super::{DownloadTask, HttpClientError, RequestCallback, RequestTask, Response};
use crate::agent::CustomCallback;
use crate::cache::{Cache, CacheManager};
pub(crate) struct DownloadCallback {
task_id: u64,
cache: Option<Cache>,
finish: Arc<AtomicBool>,
callbacks: Arc<Mutex<Vec<Box<dyn CustomCallback>>>>,
}
impl DownloadCallback {
pub(crate) fn new(
task_id: u64,
finish: Arc<AtomicBool>,
callbacks: Arc<Mutex<Vec<Box<dyn CustomCallback>>>>,
) -> Self {
Self {
task_id,
cache: None,
finish,
callbacks,
}
}
}
impl RequestCallback for DownloadCallback {
fn on_success(&mut self, response: Response) {
info!("{} success with code {}", self.task_id, response.status());
let mut cache = self.cache.take().unwrap();
let cache = cache.complete_write();
self.finish.store(true, Ordering::Release);
let mut callbacks = self.callbacks.lock().unwrap();
while let Some(mut callback) = callbacks.pop() {
callback.on_success(cache.clone());
}
}
fn on_fail(&mut self, error: HttpClientError) {
error!("{} download fail {}", self.task_id, error,);
self.finish.store(true, Ordering::Release);
let mut callbacks = self.callbacks.lock().unwrap();
while let Some(mut callback) = callbacks.pop() {
callback.on_fail(&error.to_string());
}
}
fn on_cancel(&mut self) {
info!("{} cancel download", self.task_id);
self.finish.store(true, Ordering::Release);
let mut callbacks = self.callbacks.lock().unwrap();
while let Some(mut callback) = callbacks.pop() {
callback.on_cancel();
}
}
fn on_data_receive(&mut self, data: &[u8], mut task: RequestTask) {
if self.cache.is_none() {
let headers = task.headers();
let length = parse_content_length(&headers);
self.cache = Some(
CacheManager::get_instance()
.apply_for_cache(self.task_id, length)
.unwrap(),
);
}
self.cache.as_mut().unwrap().write_all(data).unwrap();
}
fn on_progress(&mut self, dl_total: u64, dl_now: u64, ul_total: u64, ul_now: u64) {}
}
fn parse_content_length(headers: &str) -> Option<usize> {
headers.find("content-length").and_then(|position| {
headers
.split_at(position)
.1
.lines()
.next()
.and_then(|line| {
line.find(":").map(|position| {
line.split_at(position + 1)
.1
.trim()
.parse::<usize>()
.unwrap_or(0)
})
})
})
}
pub struct TaskHandle {
cancel: CancelHandle,
finish: Arc<AtomicBool>,
callbacks: Arc<Mutex<Vec<Box<dyn CustomCallback>>>>,
}
impl TaskHandle {
pub(crate) fn new(
cancel: CancelHandle,
finish: Arc<AtomicBool>,
callbacks: Arc<Mutex<Vec<Box<dyn CustomCallback>>>>,
) -> Self {
Self {
cancel,
finish,
callbacks,
}
}
pub(crate) fn cancel(&mut self) {
self.cancel.cancel();
}
pub(crate) fn cancel_handle(&self) -> CancelHandle {
self.cancel.clone()
}
pub(crate) fn try_add_callback(
&mut self,
callback: Box<dyn CustomCallback>,
) -> Result<(), Box<dyn CustomCallback>> {
let mut callbacks = self.callbacks.lock().unwrap();
if !self.finish.load(Ordering::Acquire) {
callbacks.push(callback);
Ok(())
} else {
Err(callback)
}
}
}
pub(crate) fn download(
task_id: u64,
url: &str,
callback: Option<Box<dyn CustomCallback>>,
) -> TaskHandle {
let callbacks = match callback {
Some(callback) => Arc::new(Mutex::new(vec![callback])),
None => Arc::new(Mutex::new(vec![])),
};
let finish = Arc::new(AtomicBool::new(false));
let callback = DownloadCallback::new(task_id, finish.clone(), callbacks.clone());
let cancel_handle = DownloadTask::run(url, callback);
TaskHandle::new(cancel_handle, finish, callbacks)
}
#[cfg(test)]
mod test {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use request_utils::fastrand::fast_random;
use super::*;
use crate::agent::CustomCallback;
use crate::TEST_URL;
struct TestCallback {
flag: Arc<AtomicBool>,
}
impl CustomCallback for TestCallback {
fn on_success(&mut self, data: Arc<Cache>) {
if data.size() != 0 {
self.flag.store(true, Ordering::Release);
}
}
}
#[test]
fn ut_pre_download() {
let success_flag = Arc::new(AtomicBool::new(false));
download(
fast_random(),
TEST_URL,
Some(Box::new(TestCallback {
flag: success_flag.clone(),
})),
);
std::thread::sleep(Duration::from_secs(1));
assert!(success_flag.load(Ordering::Acquire));
}
}

View File

@ -11,6 +11,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
mod error;
mod task;
mod wrapper;
cfg_ohos! {
mod netstack;
use netstack_rs::task::RequestTask;
pub use netstack::CancelHandle;
use netstack::DownloadTask;
use netstack_rs::request::RequestCallback;
use netstack_rs::response::Response;
use netstack_rs::error::HttpClientError;
}
cfg_not_ohos! {
mod ylong;
pub use ylong::CancelHandle;
pub use ylong::RequestTask;
use ylong::DownloadTask;
use ylong::RequestCallback;
use ylong::Response;
use ylong_http_client::HttpClientError;
}
pub(crate) use common::{download, TaskHandle};

View File

@ -0,0 +1,41 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use netstack_rs::request::Request;
use netstack_rs::task::RequestTask;
use super::common::DownloadCallback;
pub(crate) struct DownloadTask;
impl DownloadTask {
pub(crate) fn run(url: &str, callback: DownloadCallback) -> CancelHandle {
let mut request = Request::new();
request.url(url);
request.callback(callback);
let mut task = request.build();
task.start();
CancelHandle { inner: task }
}
}
#[derive(Clone)]
pub struct CancelHandle {
inner: RequestTask,
}
impl CancelHandle {
pub(crate) fn cancel(&mut self) {
self.inner.cancel();
}
}

View File

@ -1,119 +0,0 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::Write;
use cxx::UniquePtr;
use netstack_rs::error::HttpClientError;
use netstack_rs::request::{Request, RequestCallback};
use netstack_rs::response::Response;
use netstack_rs::task::RequestTask;
use super::wrapper::ffi::PreDownloadTaskCallback;
use super::wrapper::UserCallback;
use crate::cache::{Cache, CacheManager};
pub(super) struct PreDownloadCallback {
cache: Option<Cache>,
user_callback: Option<UserCallback>,
}
impl RequestCallback for PreDownloadCallback {
fn on_success(&mut self, response: Response) {
if let Some(ref callback) = self.user_callback {
callback.on_success();
}
self.cache.as_mut().unwrap().update_cache_size();
CacheManager::get_instance().update_cache(
"http://192.168.0.101/aaa.png".to_string(),
self.cache.take().unwrap(),
);
}
fn on_fail(&mut self, response: Response, error: HttpClientError) {
if let Some(ref callback) = self.user_callback {
callback.on_fail();
}
}
fn on_cancel(&mut self, response: Response) {
if let Some(ref callback) = self.user_callback {
callback.on_cancel();
}
}
fn on_data_receive(&mut self, data: &[u8]) {
if self.cache.is_none() {
self.cache = Some(CacheManager::get_instance().apply_for_cache(None));
}
let cache = self.cache.as_mut().unwrap();
cache.write_all(data);
}
fn on_progress(&mut self, dl_total: u64, dl_now: u64, ul_total: u64, ul_now: u64) {}
}
pub(super) struct DownloadTask;
impl DownloadTask {
pub(super) fn run(
mut request: Request<PreDownloadCallback>,
user_callback: Option<UserCallback>,
) -> TaskHandle {
request.callback(PreDownloadCallback {
cache: None,
user_callback,
});
let mut task = request.build();
task.start();
TaskHandle { inner: task }
}
}
pub struct TaskHandle {
inner: RequestTask,
}
impl TaskHandle {
pub(crate) fn cancel(&mut self) {
self.inner.cancel();
}
}
#[cfg(test)]
mod test {
use std::io::{Read, Seek};
use std::time::Duration;
use netstack_rs::request::Request;
use super::{DownloadTask, PreDownloadCallback};
use crate::cache::CacheManager;
const TEST_URL: &str = "http://192.168.0.101/aaa.png";
const FILE_LEN: usize = 9561;
#[test]
fn ut_predownload() {
let mut request = Request::new();
request.url(TEST_URL);
let task = DownloadTask::run(request, None);
std::thread::sleep(Duration::from_secs(10));
let cache = CacheManager::get_instance()
.get_cache(TEST_URL.to_string())
.unwrap();
let mut buf = vec![];
let size = cache.reader().read_to_end(&mut buf).unwrap();
assert_eq!(buf.len(), FILE_LEN);
}
}

View File

@ -1,80 +0,0 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use cxx::UniquePtr;
use ffi::{PreDownloadOptions, PreDownloadTaskCallback};
use netstack_rs::request::Request;
use super::task::{DownloadTask, TaskHandle};
pub fn pre_download(url: String, options: PreDownloadOptions) -> Box<TaskHandle> {
let mut request = Request::new();
request.url(&url);
Box::new(DownloadTask::run(
request,
UserCallback::from_ffi(options.callback),
))
}
pub(super) struct UserCallback {
inner: UniquePtr<PreDownloadTaskCallback>,
}
impl UserCallback {
fn from_ffi(ffi: UniquePtr<PreDownloadTaskCallback>) -> Option<Self> {
if ffi.is_null() {
None
} else {
Some(Self { inner: ffi })
}
}
#[inline]
pub(super) fn on_success(&self) {
self.inner.OnSuccess();
}
#[inline]
pub(super) fn on_fail(&self) {
self.inner.OnFail();
}
#[inline]
pub(super) fn on_cancel(&self) {
self.inner.OnCancel();
}
}
#[cxx::bridge(namespace = "OHOS::Request")]
pub(crate) mod ffi {
struct PreDownloadOptions {
callback: UniquePtr<PreDownloadTaskCallback>,
}
extern "Rust" {
type DownloadTask;
type TaskHandle;
fn pre_download(url: String, options: PreDownloadOptions) -> Box<TaskHandle>;
fn cancel(self: &mut TaskHandle);
}
unsafe extern "C++" {
include!("pre_download.h");
type PreDownloadTaskCallback;
fn OnSuccess(self: &PreDownloadTaskCallback);
fn OnFail(self: &PreDownloadTaskCallback);
fn OnCancel(self: &PreDownloadTaskCallback);
}
}

View File

@ -0,0 +1,33 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::LazyLock;
use ylong_http_client::async_impl::Client;
use ylong_http_client::{Redirect, Timeout, TlsVersion};
const CONNECT_TIMEOUT: u64 = 60;
const SECONDS_IN_ONE_WEEK: u64 = 7 * 24 * 60 * 60;
pub(crate) fn client() -> &'static Client {
static CLIENT: LazyLock<Client> = LazyLock::new(|| {
let client = Client::builder()
.connect_timeout(Timeout::from_secs(CONNECT_TIMEOUT))
.request_timeout(Timeout::from_secs(SECONDS_IN_ONE_WEEK))
.min_tls_version(TlsVersion::TLS_1_2)
.redirect(Redirect::limited(usize::MAX))
.tls_built_in_root_certs(true);
client.build().unwrap()
});
&CLIENT
}

View File

@ -0,0 +1,170 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod client;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use client::client;
use ylong_http_client::async_impl::{
Body, DownloadOperator, Downloader, PercentEncoder, RequestBuilder,
};
use ylong_http_client::{ErrorKind, HttpClientError, StatusCode};
use super::common::DownloadCallback;
struct Operator<'a> {
callback: &'a mut DownloadCallback,
abort_flag: Arc<AtomicBool>,
headers: String,
}
impl<'a> DownloadOperator for Operator<'a> {
fn poll_download(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
data: &[u8],
) -> Poll<Result<usize, HttpClientError>> {
let me = self.get_mut();
me.callback.on_data_receive(
data,
RequestTask {
headers: me.headers.clone(),
},
);
Poll::Ready(Ok(data.len()))
}
fn poll_progress(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
downloaded: u64,
total: Option<u64>,
) -> Poll<Result<(), HttpClientError>> {
let me = self.get_mut();
me.callback
.on_progress(total.unwrap_or_default(), downloaded, 0, 0);
if me.abort_flag.load(Ordering::Acquire) {
Poll::Ready(Err(HttpClientError::user_aborted()))
} else {
Poll::Ready(Ok(()))
}
}
}
pub struct RequestTask {
headers: String,
}
impl RequestTask {
pub(crate) fn headers(&self) -> String {
self.headers.clone()
}
}
pub struct DownloadTask;
impl DownloadTask {
pub(crate) fn run(url: &str, mut callback: DownloadCallback) -> CancelHandle {
let url = match PercentEncoder::encode(url) {
Ok(url) => url,
Err(e) => {
callback.on_fail(e);
return CancelHandle {
inner: Arc::new(AtomicBool::new(false)),
};
}
};
let flag = Arc::new(AtomicBool::new(false));
let handle = CancelHandle {
inner: flag.clone(),
};
ylong_runtime::spawn(async move {
if let Err(e) = download(url, &mut callback, flag).await {
if e.error_kind() == ErrorKind::UserAborted {
callback.on_cancel();
} else {
callback.on_fail(e);
}
}
});
handle
}
}
pub async fn download(
url: String,
callback: &mut DownloadCallback,
abort_flag: Arc<AtomicBool>,
) -> Result<(), HttpClientError> {
let request = RequestBuilder::new()
.url(url.as_str())
.method("GET")
.body(Body::empty())
.unwrap();
let response = client().request(request).await?;
let status = response.status();
let operator = Operator {
callback: callback,
abort_flag: abort_flag,
headers: response.headers().to_string(),
};
let mut downloader = Downloader::builder()
.body(response)
.operator(operator)
.build();
downloader.download().await?;
let response = Response { status: status };
callback.on_success(response);
Ok(())
}
pub struct Response {
status: StatusCode,
}
impl Response {
pub fn status(&self) -> StatusCode {
self.status
}
}
#[derive(Clone)]
pub struct CancelHandle {
inner: Arc<AtomicBool>,
}
impl CancelHandle {
pub fn cancel(&mut self) {
self.inner.store(true, Ordering::Release);
}
}
/// RequestCallback
#[allow(unused_variables)]
pub trait RequestCallback {
/// Called when the request is successful.
fn on_success(&mut self, response: Response) {}
/// Called when the request fails.
fn on_fail(&mut self, error: HttpClientError) {}
/// Called when the request is canceled.
fn on_cancel(&mut self) {}
/// Called when data is received.
fn on_data_receive(&mut self, data: &[u8], mut task: RequestTask) {}
/// Called when progress is made.
fn on_progress(&mut self, dl_total: u64, dl_now: u64, ul_total: u64, ul_now: u64) {}
}

View File

@ -13,15 +13,39 @@
//! # Pre-download native
#![warn(missing_docs)]
#![allow(unused)]
#![allow(missing_docs)]
#![allow(stable_features)]
#![feature(lazy_cell)]
#![allow(unused)]
#[macro_use]
extern crate request_utils;
mod agent;
pub use agent::{CustomCallback, DownloadAgent};
mod cache;
#[cfg(feature = "ohos")]
mod download;
/// a
fn a() {}
cfg_ohos! {
mod wrapper;
const TAG: &str = "PreDownloadNative\0";
const DOMAIN: u32 = 0xD001C50;
use ffrt_rs::ffrt_spawn as spawn;
}
cfg_not_ohos! {
use std::thread::spawn as spawn;
}
cfg_test! {
#[cfg(not(feature = "ohos"))]
fn init() {
let _ = env_logger::builder().is_test(true).format_timestamp_millis().try_init();
}
#[cfg(feature = "ohos")]
fn init() {}
const TEST_URL: &str =
"http://www.baidu.com";
}

View File

@ -0,0 +1,82 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use cxx::UniquePtr;
use ffi::PreDownloadCallback;
use crate::agent::DownloadAgent;
use crate::cache::Cache;
use crate::download::CancelHandle;
use crate::CustomCallback;
pub(super) struct FfiCallback {
inner: UniquePtr<PreDownloadCallback>,
}
unsafe impl Send for FfiCallback {}
impl FfiCallback {
pub(crate) fn from_ffi(ffi: UniquePtr<PreDownloadCallback>) -> Option<Self> {
if ffi.is_null() {
None
} else {
Some(Self { inner: ffi })
}
}
}
impl CustomCallback for FfiCallback {
fn on_success(&mut self, data: Arc<Cache>) {
self.inner.OnSuccess();
}
fn on_fail(&mut self, error: &str) {
self.inner.OnFail();
}
fn on_cancel(&mut self) {
self.inner.OnCancel();
}
}
fn download_agent() -> &'static DownloadAgent {
DownloadAgent::get_instance()
}
#[cxx::bridge(namespace = "OHOS::Request")]
pub(crate) mod ffi {
extern "Rust" {
type DownloadAgent;
type CancelHandle;
fn cancel(self: &mut CancelHandle);
fn ffi_pre_download(
self: &DownloadAgent,
url: String,
mut callback: UniquePtr<PreDownloadCallback>,
update: bool,
);
fn download_agent() -> &'static DownloadAgent;
}
unsafe extern "C++" {
include!("request_pre_download.h");
type PreDownloadCallback;
fn OnSuccess(self: &PreDownloadCallback);
fn OnFail(self: &PreDownloadCallback);
fn OnCancel(self: &PreDownloadCallback);
}
}

View File

@ -18,3 +18,4 @@ edition = "2021"
[dependencies]
cxx = "1.0.115"
request_utils = { path = "../../request_utils" }

View File

@ -30,11 +30,12 @@ void OnCallback(std::shared_ptr<HttpClientTask> task, rust::Box<CallbackWrapper>
task->OnSuccess(
[shared](const HttpClientRequest &, const HttpClientResponse &response) { shared->on_success(response); });
task->OnFail([shared](const HttpClientRequest &, const HttpClientResponse &response,
const HttpClientError &error) { shared->on_fail(response, error); });
const HttpClientError &error) { shared->on_fail(error); });
task->OnCancel(
[shared](const HttpClientRequest &, const HttpClientResponse &response) { shared->on_cancel(response); });
task->OnDataReceive(
[shared](const HttpClientRequest &, const uint8_t *data, size_t size) { shared->on_data_receive(data, size); });
task->OnDataReceive([shared, task](const HttpClientRequest &, const uint8_t *data, size_t size) {
shared->on_data_receive(task, data, size);
});
task->OnProgress([shared](const HttpClientRequest &, u_long dlTotal, u_long dlNow, u_long ulTotal, u_long ulNow) {
shared->on_progress(dlTotal, dlNow, ulTotal, ulNow);
});

View File

@ -50,12 +50,7 @@ impl Display for HttpClientError {
impl Debug for HttpClientError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"HttpClientError: code {:?} msg {}",
self.code(),
self.msg()
)
write!(f, "code {:?}, msg {}", self.code(), self.msg())
}
}

View File

@ -95,11 +95,11 @@ pub trait RequestCallback {
/// Called when the request is successful.
fn on_success(&mut self, response: Response) {}
/// Called when the request fails.
fn on_fail(&mut self, response: Response, error: HttpClientError) {}
fn on_fail(&mut self, error: HttpClientError) {}
/// Called when the request is canceled.
fn on_cancel(&mut self, response: Response) {}
fn on_cancel(&mut self) {}
/// Called when data is received.
fn on_data_receive(&mut self, data: &[u8]) {}
fn on_data_receive(&mut self, data: &[u8], task: RequestTask) {}
/// Called when progress is made.
fn on_progress(&mut self, dl_total: u64, dl_now: u64, ul_total: u64, ul_now: u64) {}
}

View File

@ -12,6 +12,7 @@
// limitations under the License.
use std::default;
use std::fmt::Display;
use crate::wrapper::ffi::HttpClientResponse;
@ -22,7 +23,7 @@ pub struct Response<'a> {
impl<'a> Response<'a> {
/// Get Response Code
pub fn code(&self) -> ResponseCode {
pub fn status(&self) -> ResponseCode {
self.inner
.GetResponseCode()
.try_into()
@ -30,12 +31,16 @@ impl<'a> Response<'a> {
.unwrap_or_default()
}
pub fn headers(&self) -> String {
self.inner.GetHeader().to_string()
}
pub(crate) fn from_ffi(inner: &'a HttpClientResponse) -> Self {
Self { inner }
}
}
#[derive(Debug, Default)]
#[derive(Clone, Debug, Default)]
pub enum ResponseCode {
#[default]
None = 0,
@ -75,3 +80,10 @@ pub enum ResponseCode {
GatewayTimeout,
Version,
}
impl Display for ResponseCode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let code = self.clone() as i32;
write!(f, "{} {:?}", code, self)
}
}

View File

@ -17,14 +17,18 @@ use std::pin::Pin;
use cxx::SharedPtr;
use crate::request::{Request, RequestCallback};
use crate::response::Response;
use crate::wrapper::ffi::{HttpClientRequest, HttpClientTask, NewHttpClientTask, OnCallback};
use crate::wrapper::CallbackWrapper;
/// RequestTask
#[derive(Clone)]
pub struct RequestTask {
inner: SharedPtr<HttpClientTask>,
}
unsafe impl Send for RequestTask {}
/// RequestTask status
#[derive(Debug, Default)]
pub enum TaskStatus {
@ -42,6 +46,10 @@ impl RequestTask {
}
}
pub(crate) fn from_ffi(inner: SharedPtr<HttpClientTask>) -> Self {
Self { inner }
}
/// start the request task
pub fn start(&mut self) -> bool {
self.pin_mut().Start()
@ -61,6 +69,14 @@ impl RequestTask {
.unwrap_or_default()
}
pub fn response(&mut self) -> Response {
Response::from_ffi(self.pin_mut().GetResponse().into_ref().get_ref())
}
pub fn headers(&mut self) -> String {
self.response().headers()
}
pub(crate) fn callback(&mut self, callback: impl RequestCallback + 'static) {
OnCallback(
self.inner.clone(),

View File

@ -13,10 +13,12 @@
use std::ffi::c_void;
use cxx::SharedPtr;
use crate::error::{HttpClientError, HttpErrorCode};
use crate::request::{Request, RequestCallback};
use crate::response::{self, Response, ResponseCode};
use crate::task::TaskStatus;
use crate::task::{RequestTask, TaskStatus};
pub struct CallbackWrapper {
inner: Box<dyn RequestCallback>,
@ -36,19 +38,23 @@ impl CallbackWrapper {
self.inner.on_success(response);
}
fn on_fail(&mut self, response: &ffi::HttpClientResponse, error: &ffi::HttpClientError) {
let response = Response::from_ffi(response);
fn on_fail(&mut self, error: &ffi::HttpClientError) {
let error = HttpClientError::from_ffi(error);
self.inner.on_fail(response, error);
self.inner.on_fail(error);
}
fn on_cancel(&mut self, response: &ffi::HttpClientResponse) {
let response = Response::from_ffi(response);
self.inner.on_cancel(response);
self.inner.on_cancel();
}
fn on_data_receive(&mut self, data: *const u8, size: usize) {
fn on_data_receive(
&mut self,
task: SharedPtr<ffi::HttpClientTask>,
data: *const u8,
size: usize,
) {
let data = unsafe { std::slice::from_raw_parts(data, size) };
self.inner.on_data_receive(data);
let task = RequestTask::from_ffi(task);
self.inner.on_data_receive(data, task);
}
fn on_progress(&mut self, dl_total: u64, dl_now: u64, ul_total: u64, ul_now: u64) {
self.inner.on_progress(dl_total, dl_now, ul_total, ul_now);
@ -60,13 +66,14 @@ pub(crate) mod ffi {
extern "Rust" {
type CallbackWrapper;
fn on_success(self: &mut CallbackWrapper, response: &HttpClientResponse);
fn on_fail(
self: &mut CallbackWrapper,
response: &HttpClientResponse,
error: &HttpClientError,
);
fn on_fail(self: &mut CallbackWrapper, error: &HttpClientError);
fn on_cancel(self: &mut CallbackWrapper, response: &HttpClientResponse);
unsafe fn on_data_receive(self: &mut CallbackWrapper, data: *const u8, size: usize);
unsafe fn on_data_receive(
self: &mut CallbackWrapper,
task: SharedPtr<HttpClientTask>,
data: *const u8,
size: usize,
);
fn on_progress(
self: &mut CallbackWrapper,
dl_total: u64,
@ -105,6 +112,7 @@ pub(crate) mod ffi {
type HttpClientTask;
fn NewHttpClientTask(request: &HttpClientRequest) -> SharedPtr<HttpClientTask>;
fn GetResponse(self: Pin<&mut HttpClientTask>) -> Pin<&mut HttpClientResponse>;
fn Start(self: Pin<&mut HttpClientTask>) -> bool;
fn Cancel(self: Pin<&mut HttpClientTask>);
fn GetStatus(self: Pin<&mut HttpClientTask>) -> TaskStatus;

View File

@ -16,15 +16,11 @@
use std::time::Duration;
use netstack_rs::request::{Request, RequestCallback};
use netstack_rs::response::Response;
struct Callback {}
impl RequestCallback for Callback {
fn on_fail(
&mut self,
response: netstack_rs::response::Response,
error: netstack_rs::error::HttpClientError,
) {
}
fn on_fail(&mut self, error: netstack_rs::error::HttpClientError) {}
}
#[test]

View File

@ -14,6 +14,37 @@
import("//base/request/request/request_aafwk.gni")
import("//build/test.gni")
rust_cxx("request_utils_cxx_gen") {
sources = [ "src/wrapper.rs" ]
}
ohos_static_library("request_utils_cxx") {
sanitize = {
integer_overflow = true
ubsan = true
boundary_sanitize = true
cfi = true
cfi_cross_dso = true
debug = false
}
stack_protector_ret = true
include_dirs = [ "${target_gen_dir}/src" ]
sources = []
sources += get_target_outputs(":request_utils_cxx_gen")
external_deps = [ "hilog:libhilog" ]
deps = [
":request_utils_cxx_gen",
"//third_party/rust/crates/cxx:cxx_cppdeps",
]
subsystem_name = "request"
part_name = "request"
}
ohos_rust_static_library("request_utils") {
sanitize = {
integer_overflow = true
@ -24,9 +55,32 @@ ohos_rust_static_library("request_utils") {
debug = false
}
sources = [ "src/lib.rs" ]
features = [ "ohos" ]
external_deps = [ "hilog:hilog_rust" ]
deps = [
":request_utils_cxx",
"//third_party/rust/crates/cxx:lib",
]
external_deps = [ "hilog:libhilog" ]
sources = [ "src/lib.rs" ]
subsystem_name = "request"
part_name = "request"
}
ohos_rust_unittest("rust_request_utils_ut_test") {
module_out_path = "request/predownload"
sources = [ "src/lib.rs" ]
external_deps = [ "hilog:libhilog" ]
rustflags = [ "--cfg=feature=\"ohos\"" ]
deps = [
":request_utils_cxx",
"//third_party/rust/crates/cxx:lib",
]
subsystem_name = "request"
part_name = "request"

View File

@ -16,4 +16,14 @@ name = "request_utils"
version = "0.1.0"
edition = "2021"
[features]
default = []
ohos = [
"cxx",
]
[dependencies]
cxx = { version = "1.0.115", optional = true }
log = "0.4.22"

View File

@ -0,0 +1,57 @@
// Copyright (c) 2023 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! A simple fast pseudorandom implementation, ranges from 0 to usize::MAX
//! Reference: xorshift* <https://dl.acm.org/doi/10.1145/2845077>
use std::cell::Cell;
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hasher};
use std::num::Wrapping;
/// Generates a fast random ranging from 0 to usize::MAX
///
/// # Examples
/// ```rust
/// use ylong_runtime::fastrand::fast_random;
/// let rand = fast_random();
/// assert!(rand <= u64::MAX);
/// ```
pub fn fast_random() -> u64 {
thread_local! {
static RNG: Cell<Wrapping<u64>> = Cell::new(Wrapping(seed()));
}
RNG.with(|rng| {
let mut s = rng.get();
s ^= s >> 12;
s ^= s << 25;
s ^= s >> 27;
rng.set(s);
s.0.wrapping_mul(0x2545_f491_4f6c_dd1d)
})
}
fn seed() -> u64 {
let seed = RandomState::new();
let mut out = 0;
let mut count = 0;
while out == 0 {
count += 1;
let mut hasher = seed.build_hasher();
hasher.write_usize(count);
out = hasher.finish();
}
out
}

View File

@ -15,10 +15,8 @@
#[macro_export]
macro_rules! debug {
($fmt: literal $(, $args:expr)* $(,)?) => {{
use hilog_rust::hilog;
use std::ffi::{c_char, CString};
use crate::LOG_LABEL;
hilog_rust::debug!(LOG_LABEL, $fmt $(, @public($args))*);
let fmt = format!($fmt $(, $args)*);
$crate::hilog_print($crate::LogLevel::LOG_DEBUG, crate::DOMAIN, crate::TAG, fmt);
}}
}
@ -26,11 +24,8 @@ macro_rules! debug {
#[macro_export]
macro_rules! info {
($fmt: literal $(, $args:expr)* $(,)?) => {{
use hilog_rust::hilog;
use std::ffi::{c_char, CString};
use crate::LOG_LABEL;
hilog_rust::info!(LOG_LABEL, $fmt $(, @public($args))*);
let fmt = format!($fmt $(, $args)*);
$crate::hilog_print($crate::LogLevel::LOG_INFO, crate::DOMAIN, crate::TAG, fmt);
}}
}
@ -38,10 +33,7 @@ macro_rules! info {
#[macro_export]
macro_rules! error {
($fmt: literal $(, $args:expr)* $(,)?) => {{
use hilog_rust::hilog;
use std::ffi::{c_char, CString};
use crate::LOG_LABEL;
hilog_rust::error!(LOG_LABEL, $fmt $(, @public($args))*);
let fmt = format!($fmt $(, $args)*);
$crate::hilog_print($crate::LogLevel::LOG_ERROR, crate::DOMAIN, crate::TAG, fmt);
}}
}

View File

@ -16,3 +16,23 @@
#![warn(missing_docs)]
#![allow(clippy::crate_in_macro_def)]
#![allow(missing_docs)]
#![allow(unused)]
#[macro_use]
mod macros;
pub mod fastrand;
pub mod queue_map;
cfg_not_ohos! {
#[macro_use]
pub use log::{debug, error, info};
}
cfg_ohos! {
#[macro_use]
mod hilog;
mod wrapper;
pub use wrapper::{hilog_print, LogLevel, LogType};
}

View File

@ -0,0 +1,44 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[macro_export]
macro_rules! cfg_test {
($($item:item)*) => {
$(
#[cfg(test)]
$item
)*
}
}
#[macro_use]
#[macro_export]
macro_rules! cfg_ohos {
($($item:item)*) => {
$(
#[cfg(feature = "ohos")]
$item
)*
}
}
#[macro_use]
#[macro_export]
macro_rules! cfg_not_ohos {
($($item:item)*) => {
$(
#[cfg(not(feature = "ohos"))]
$item
)*
}
}

View File

@ -14,13 +14,13 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::hash::Hash;
pub struct QueueMap<N, T> {
map: HashMap<N, T>,
v: VecDeque<N>,
removed: HashSet<N>,
pub struct QueueMap<K, V> {
map: HashMap<K, V>,
v: VecDeque<K>,
removed: HashSet<K>,
}
impl<N: Eq + Hash + Clone, T> QueueMap<N, T> {
impl<K: Eq + Hash + Clone, V> QueueMap<K, V> {
pub fn new() -> Self {
Self {
map: HashMap::new(),
@ -29,7 +29,7 @@ impl<N: Eq + Hash + Clone, T> QueueMap<N, T> {
}
}
pub fn pop_front(&mut self) -> Option<T> {
pub fn pop(&mut self) -> Option<V> {
while let Some(n) = self.v.pop_front() {
if !self.removed.remove(&n) {
let ret = self.map.remove(&n);
@ -40,27 +40,31 @@ impl<N: Eq + Hash + Clone, T> QueueMap<N, T> {
None
}
pub fn push_back(&mut self, n: N, t: T) {
self.removed.remove(&n);
self.v.push_back(n.clone());
self.map.insert(n, t);
pub fn insert(&mut self, k: K, v: V) -> Option<V> {
self.removed.remove(&k);
self.v.push_back(k.clone());
self.map.insert(k, v)
}
pub fn contains_key(&self, n: &N) -> bool {
self.map.contains_key(n)
pub fn get(&self, k: &K) -> Option<&V> {
self.map.get(k)
}
pub fn remove(&mut self, n: &N) -> Option<T> {
if let Some(t) = self.map.remove(n) {
self.removed.insert(n.clone());
pub fn contains_key(&self, k: &K) -> bool {
self.map.contains_key(k)
}
pub fn remove(&mut self, k: &K) -> Option<V> {
if let Some(t) = self.map.remove(k) {
self.removed.insert(k.clone());
Some(t)
} else {
None
}
}
pub fn get(&self, n: &N) -> Option<&T> {
self.map.get(n)
pub fn get_mut(&mut self, k: &K) -> Option<&mut V> {
self.map.get_mut(k)
}
}
@ -71,38 +75,45 @@ mod test {
#[test]
fn ut_queue_map_fifo() {
let mut map = QueueMap::new();
map.push_back(1, 1);
map.push_back(2, 2);
map.insert(1, 1);
map.insert(2, 2);
assert!(map.contains_key(&1));
assert_eq!(map.pop_front().unwrap(), 1);
assert_eq!(map.pop().unwrap(), 1);
}
#[test]
fn ut_queue_map_remove() {
let mut map = QueueMap::new();
map.push_back(1, 1);
map.push_back(2, 2);
map.push_back(3, 3);
map.push_back(4, 4);
map.insert(1, 1);
map.insert(2, 2);
map.insert(3, 3);
map.insert(4, 4);
map.remove(&1);
map.remove(&2);
map.remove(&3);
map.push_back(3, 3);
assert_eq!(map.pop_front().unwrap(), 3);
assert_eq!(map.pop_front().unwrap(), 4);
map.push_back(1, 1);
map.push_back(2, 2);
assert_eq!(map.pop_front().unwrap(), 1);
assert_eq!(map.pop_front().unwrap(), 2);
assert!(map.pop_front().is_none());
map.insert(3, 3);
assert_eq!(map.pop().unwrap(), 3);
assert_eq!(map.pop().unwrap(), 4);
map.insert(1, 1);
map.insert(2, 2);
assert_eq!(map.pop().unwrap(), 1);
assert_eq!(map.pop().unwrap(), 2);
assert!(map.pop().is_none());
}
#[test]
fn ut_queue_map_same_key() {
let mut map = QueueMap::new();
map.push_back(1, 1);
map.push_back(1, 2);
assert_eq!(map.pop_front().unwrap(), 2);
assert!(map.pop_front().is_none());
map.insert(1, 1);
map.insert(1, 2);
assert_eq!(map.pop().unwrap(), 2);
assert!(map.pop().is_none());
}
#[test]
fn ut_queue_map_insert_get() {
let mut map = QueueMap::new();
map.insert(1, 1);
assert_eq!(1, *map.get(&1).unwrap());
}
}

View File

@ -0,0 +1,89 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ffi::{c_char, CString};
use std::ptr::null;
pub use ffi::{LogLevel, LogType};
#[cxx::bridge]
mod ffi {
#[repr(i32)]
enum LogType {
// min log type
LOG_TYPE_MIN = 0,
// Used by app log.
LOG_APP = 0,
// Log to kmsg, only used by init phase.
LOG_INIT = 1,
// Used by core service, framework.
LOG_CORE = 3,
// Used by kmsg log.
LOG_KMSG = 4,
// Not print in release version.
LOG_ONLY_PRERELEASE = 5,
// max log type
LOG_TYPE_MAX,
}
// Log level
#[repr(i32)]
enum LogLevel {
// min log level
LOG_LEVEL_MIN = 0,
// Designates lower priority log.
LOG_DEBUG = 3,
// Designates useful information.
LOG_INFO = 4,
// Designates hazardous situations.
LOG_WARN = 5,
// Designates very serious errors.
LOG_ERROR = 6,
// Designates major fatal anomaly.
LOG_FATAL = 7,
// max log level
LOG_LEVEL_MAX,
}
unsafe extern "C++" {
include!("hilog/log.h");
type LogType;
type LogLevel;
}
}
pub fn hilog_print(level: LogLevel, domain: u32, tag: &str, mut fmt: String) {
let tag = tag.as_ptr() as *const c_char;
fmt.push('\0');
unsafe {
HiLogPrint(
LogType::LOG_CORE,
level,
domain,
tag,
fmt.as_ptr() as *const c_char,
);
}
}
extern "C" {
fn HiLogPrint(
log_type: ffi::LogType,
level: ffi::LogLevel,
domain: u32,
tag: *const c_char,
fmt: *const c_char,
...
) -> i32;
}