!1073 preload 问题修复

Merge pull request !1073 from fqwert/sssx
This commit is contained in:
openharmony_ci 2024-11-15 10:25:48 +00:00 committed by Gitee
commit ab0f7a99f8
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
18 changed files with 211 additions and 282 deletions

View File

@ -119,10 +119,10 @@
"//base/request/request/test/unittest/cpp_test/pre_download:unittest", "//base/request/request/test/unittest/cpp_test/pre_download:unittest",
"//base/request/request/test/unittest/cpp_test/fwkTest:unittest", "//base/request/request/test/unittest/cpp_test/fwkTest:unittest",
"//base/request/request/services/tests:unittest", "//base/request/request/services/tests:unittest",
"//base/request/request/pre_download/netstack_rs/tests:unittest", "//base/request/request/pre_download/netstack_rs:unittest",
"//base/request/request/pre_download/native:rust_predownload_native_ut_test", "//base/request/request/pre_download/native:unittest",
"//base/request/request/pre_download/ffrt_rs:rust_request_ffrt_rs_ut_test", "//base/request/request/pre_download/ffrt_rs:unittest",
"//base/request/request/request_utils:rust_request_utils_ut_test" "//base/request/request/request_utils:unittest"
] ]
} }
} }

View File

@ -87,3 +87,8 @@ ohos_rust_unittest("rust_request_ffrt_rs_ut_test") {
subsystem_name = "request" subsystem_name = "request"
part_name = "request" part_name = "request"
} }
group("unittest") {
testonly = true
deps = [ ":rust_request_ffrt_rs_ut_test" ]
}

View File

@ -135,3 +135,8 @@ ohos_rust_unittest("rust_predownload_native_ut_test") {
subsystem_name = "request" subsystem_name = "request"
part_name = "request" part_name = "request"
} }
group("unittest") {
testonly = true
deps = [ ":rust_predownload_native_ut_test" ]
}

View File

@ -64,5 +64,5 @@ fn config() -> Criterion {
Criterion::default().sample_size(1000) Criterion::default().sample_size(1000)
} }
criterion_group! {name = agent; config = config();targets = predownload_benchmark_different_url, predownload_benchmark_same_url} criterion_group! {name = agent; config = config();targets = predownload_benchmark_same_url}
criterion_main!(agent); criterion_main!(agent);

View File

@ -36,7 +36,7 @@ pub trait CustomCallback: Send {
} }
pub struct DownloadAgent { pub struct DownloadAgent {
running_tasks: Mutex<HashMap<TaskId, Arc<Updater>>>, running_tasks: Mutex<HashMap<TaskId, Arc<Mutex<Updater>>>>,
} }
pub struct DownloadRequest<'a> { pub struct DownloadRequest<'a> {
@ -110,14 +110,14 @@ impl DownloadAgent {
pub fn cancel(&self, url: &str) { pub fn cancel(&self, url: &str) {
let task_id = TaskId::from_url(url); let task_id = TaskId::from_url(url);
if let Some(updater) = self.running_tasks.lock().unwrap().get(&task_id).cloned() { if let Some(updater) = self.running_tasks.lock().unwrap().get(&task_id).cloned() {
updater.cancel(); updater.lock().unwrap().cancel();
} }
} }
pub fn remove(&self, url: &str) { pub fn remove(&self, url: &str) {
let task_id = TaskId::from_url(url); let task_id = TaskId::from_url(url);
if let Some(updater) = self.running_tasks.lock().unwrap().remove(&task_id) { if let Some(updater) = self.running_tasks.lock().unwrap().remove(&task_id) {
updater.cancel(); updater.lock().unwrap().cancel();
} }
CacheManager::get_instance().remove(task_id); CacheManager::get_instance().remove(task_id);
} }
@ -143,32 +143,54 @@ impl DownloadAgent {
} }
} }
let updater = match self.running_tasks.lock().unwrap().entry(task_id.clone()) { loop {
Entry::Occupied(entry) => entry.get().clone(), let cb = callback;
Entry::Vacant(entry) => { let updater = match self.running_tasks.lock().unwrap().entry(task_id.clone()) {
info!("new pre_download task {}", task_id.brief()); Entry::Occupied(entry) => entry.get().clone(),
let updater = Arc::new(Updater::new(task_id.clone(), request, callback)); Entry::Vacant(entry) => {
let handle = updater.task_handle(); let updater =
entry.insert(updater); Arc::new(Mutex::new(Updater::new(task_id.clone(), request, cb, 0)));
return handle; let handle = updater.lock().unwrap().task_handle();
} entry.insert(updater);
}; return handle;
let mut handle = updater.handle.lock().unwrap();
match handle.try_add_callback(callback) {
Ok(()) => handle.clone(),
Err(callback) => {
if let Err(callback) = self.fetch(&task_id, callback) {
error!("{} fetch fail", task_id.brief());
*handle = Updater::new(task_id.clone(), request, callback).task_handle();
handle.clone()
} else {
info!("{} fetch success", task_id.brief());
let handle = TaskHandle::new(task_id);
handle.set_completed();
handle
} }
} };
let mut updater = updater.lock().unwrap();
let handle = match updater.try_add_callback(cb) {
Ok(()) => updater.task_handle(),
Err(cb) => {
if let Err(cb) = self.fetch(&task_id, cb) {
error!("{} fetch fail after update", task_id.brief());
if !updater.remove_flag {
let seq = updater.seq + 1;
*updater = Updater::new(task_id.clone(), request, cb, seq);
updater.task_handle()
} else {
callback = cb;
continue;
}
} else {
info!("{} fetch success", task_id.brief());
let handle = TaskHandle::new(task_id);
handle.set_completed();
handle
}
}
};
break handle;
}
}
pub(crate) fn task_finish(&self, task_id: &TaskId, seq: usize) {
let Some(updater) = self.running_tasks.lock().unwrap().get(task_id).cloned() else {
return;
};
let mut updater = updater.lock().unwrap();
if updater.seq == seq {
updater.remove_flag = true;
self.running_tasks.lock().unwrap().remove(task_id);
} }
} }
@ -198,10 +220,6 @@ impl DownloadAgent {
Box::new(self.pre_download(request, Box::new(callback), update)) Box::new(self.pre_download(request, Box::new(callback), update))
} }
pub(crate) fn task_finish(&self, task_id: &TaskId) {
self.running_tasks.lock().unwrap().remove(task_id);
}
pub fn set_file_cache_size(&self, size: u64) { pub fn set_file_cache_size(&self, size: u64) {
info!("set file cache size to {}", size); info!("set file cache size to {}", size);
CacheManager::get_instance().set_file_cache_size(size); CacheManager::get_instance().set_file_cache_size(size);
@ -349,7 +367,7 @@ mod test {
let callback = Box::new(TestCallbackC { let callback = Box::new(TestCallbackC {
flag: cancel_flag.clone(), flag: cancel_flag.clone(),
}); });
let mut handle = agent.pre_download(DownloadRequest::new(TEST_URL), callback, true); let handle = agent.pre_download(DownloadRequest::new(TEST_URL), callback, true);
handle.cancel(); handle.cancel();
std::thread::sleep(Duration::from_secs(1)); std::thread::sleep(Duration::from_secs(1));
assert_eq!(cancel_flag.load(Ordering::SeqCst), 1); assert_eq!(cancel_flag.load(Ordering::SeqCst), 1);
@ -382,7 +400,7 @@ mod test {
flag: cancel_flag_1.clone(), flag: cancel_flag_1.clone(),
}); });
let mut handle_0 = agent.pre_download(DownloadRequest::new(TEST_URL), callback_0, false); let handle_0 = agent.pre_download(DownloadRequest::new(TEST_URL), callback_0, false);
agent.pre_download(DownloadRequest::new(TEST_URL), callback_1, false); agent.pre_download(DownloadRequest::new(TEST_URL), callback_1, false);
handle_0.cancel(); handle_0.cancel();
std::thread::sleep(Duration::from_secs(1)); std::thread::sleep(Duration::from_secs(1));

View File

@ -264,13 +264,16 @@ impl CacheManager {
let size = file.metadata()?.size(); let size = file.metadata()?.size();
let mut cache = RamCache::try_new(task_id.clone(), self, size as usize) let mut cache = RamCache::new(task_id.clone(), self, Some(size as usize));
.unwrap_or_else(|| RamCache::temp(task_id.clone(), self, Some(size as usize)));
io::copy(&mut file, &mut cache).unwrap(); io::copy(&mut file, &mut cache).unwrap();
let is_cache = cache.check_size();
let cache = Arc::new(cache); let cache = Arc::new(cache);
if !cache.is_temp() {
self.update_from_file(task_id.clone(), cache.clone()); if is_cache {
self.update_ram_cache(cache.clone());
} }
ret = Some(cache.clone()); ret = Some(cache.clone());
let weak_cache = Arc::downgrade(&cache); let weak_cache = Arc::downgrade(&cache);
Ok(weak_cache) Ok(weak_cache)
@ -320,7 +323,7 @@ mod test {
for _ in 0..1000 { for _ in 0..1000 {
let task_id = TaskId::random(); let task_id = TaskId::random();
let mut ram_cache = let mut ram_cache =
RamCache::temp(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
ram_cache.write_all(TEST_STRING.as_bytes()).unwrap(); ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap(); FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap();
} }
@ -329,7 +332,7 @@ mod test {
for _ in 0..1000 { for _ in 0..1000 {
let task_id = TaskId::random(); let task_id = TaskId::random();
let mut ram_cache = let mut ram_cache =
RamCache::temp(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
ram_cache.write_all(TEST_STRING.as_bytes()).unwrap(); ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
let file_cache = let file_cache =
FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)) FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache))
@ -353,7 +356,7 @@ mod test {
while total < TEST_SIZE { while total < TEST_SIZE {
let task_id = TaskId::random(); let task_id = TaskId::random();
let mut ram_cache = let mut ram_cache =
RamCache::temp(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
ram_cache.write_all(TEST_STRING.as_bytes()).unwrap(); ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
v.push( v.push(
FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)) FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache))
@ -362,14 +365,14 @@ mod test {
total += TEST_STRING_SIZE as u64; total += TEST_STRING_SIZE as u64;
} }
let task_id = TaskId::random(); let task_id = TaskId::random();
let mut ram_cache = RamCache::temp(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); let mut ram_cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
ram_cache.write_all(TEST_STRING.as_bytes()).unwrap(); ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
assert!( assert!(
FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).is_none() FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).is_none()
); );
v.pop(); v.pop();
let task_id = TaskId::random(); let task_id = TaskId::random();
let mut ram_cache = RamCache::temp(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); let mut ram_cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
ram_cache.write_all(TEST_STRING.as_bytes()).unwrap(); ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap(); FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap();
} }
@ -381,7 +384,7 @@ mod test {
CACHE_MANAGER.set_file_cache_size(TEST_SIZE); CACHE_MANAGER.set_file_cache_size(TEST_SIZE);
let task_id = TaskId::random(); let task_id = TaskId::random();
let mut ram_cache = RamCache::temp(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); let mut ram_cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
ram_cache.write_all(TEST_STRING.as_bytes()).unwrap(); ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
let file_cache = let file_cache =
FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap(); FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap();
@ -400,7 +403,7 @@ mod test {
CACHE_MANAGER.set_file_cache_size(TEST_SIZE); CACHE_MANAGER.set_file_cache_size(TEST_SIZE);
let task_id = TaskId::random(); let task_id = TaskId::random();
let mut ram_cache = RamCache::temp(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); let mut ram_cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
ram_cache.write_all(TEST_STRING.as_bytes()).unwrap(); ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
let file_cache = let file_cache =
FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap(); FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap();
@ -450,7 +453,7 @@ mod test {
CACHE_MANAGER.set_file_cache_size(TEST_SIZE); CACHE_MANAGER.set_file_cache_size(TEST_SIZE);
let task_id = TaskId::random(); let task_id = TaskId::random();
let mut ram_cache = RamCache::temp(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); let mut ram_cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
ram_cache.write_all(TEST_STRING.as_bytes()).unwrap(); ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
let file_cache = let file_cache =
FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap(); FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap();

View File

@ -25,8 +25,6 @@ pub struct RamCache {
data: Vec<u8>, data: Vec<u8>,
applied: u64, applied: u64,
handle: &'static CacheManager, handle: &'static CacheManager,
is_temp: bool,
} }
impl Drop for RamCache { impl Drop for RamCache {
@ -43,61 +41,53 @@ impl Drop for RamCache {
} }
impl RamCache { impl RamCache {
pub(crate) fn temp( pub(crate) fn new(task_id: TaskId, handle: &'static CacheManager, size: Option<usize>) -> Self {
task_id: TaskId, let applied = match size {
handle: &'static CacheManager, Some(size) => {
size: Option<usize>, if CacheManager::apply_cache(
) -> Self { &handle.ram_handle,
&handle.rams,
|a| RamCache::task_id(a),
size,
) {
info!(
"apply ram cache {} for task {} success",
size,
task_id.brief()
);
size as u64
} else {
error!(
"apply ram cache {} for task {} failed",
size,
task_id.brief()
);
0
}
}
None => 0,
};
Self { Self {
task_id, task_id,
data: Vec::with_capacity(size.unwrap_or(DEFAULT_TRUNK_CAPACITY)), data: Vec::with_capacity(size.unwrap_or(DEFAULT_TRUNK_CAPACITY)),
applied: 0, applied,
handle, handle,
is_temp: true,
} }
} }
pub(crate) fn try_new(
task_id: TaskId,
handle: &'static CacheManager,
size: usize,
) -> Option<Self> {
info!(
"try apply new ram cache {} for task {}",
size,
task_id.brief()
);
if !CacheManager::apply_cache(
&handle.ram_handle,
&handle.rams,
|a| RamCache::task_id(a),
size,
) {
info!("apply ram cache for task {} failed", task_id.brief());
return None;
}
info!("apply ram cache for task {} success", task_id.brief());
Some(Self {
task_id,
data: Vec::with_capacity(size),
applied: size as u64,
handle,
is_temp: false,
})
}
pub(crate) fn finish_write(mut self) -> Arc<RamCache> { pub(crate) fn finish_write(mut self) -> Arc<RamCache> {
if self.is_temp || !self.check_size() { let is_cache = self.check_size();
return Arc::new(self);
}
let me = Arc::new(self); let me = Arc::new(self);
me.handle.update_cache(me.clone());
if is_cache {
me.handle.update_ram_cache(me.clone());
}
me.handle.update_file_cache(me.task_id.clone(), me.clone());
me me
} }
fn check_size(&mut self) -> bool { pub(crate) fn check_size(&mut self) -> bool {
match (self.data.len() as u64).cmp(&self.applied) { match (self.data.len() as u64).cmp(&self.applied) {
Ordering::Equal => true, Ordering::Equal => true,
Ordering::Greater => { Ordering::Greater => {
@ -146,10 +136,6 @@ impl RamCache {
pub(crate) fn cursor(&self) -> Cursor<&[u8]> { pub(crate) fn cursor(&self) -> Cursor<&[u8]> {
Cursor::new(&self.data) Cursor::new(&self.data)
} }
pub(super) fn is_temp(&self) -> bool {
self.is_temp
}
} }
impl Write for RamCache { impl Write for RamCache {
@ -163,15 +149,9 @@ impl Write for RamCache {
} }
impl CacheManager { impl CacheManager {
fn update_cache(&'static self, cache: Arc<RamCache>) { pub(crate) fn update_ram_cache(&'static self, cache: Arc<RamCache>) {
self.update_cache_inner(cache.task_id().clone(), cache, false); let task_id = cache.task_id().clone();
}
pub(super) fn update_from_file(&'static self, task_id: TaskId, cache: Arc<RamCache>) {
self.update_cache_inner(task_id, cache, true);
}
fn update_cache_inner(&'static self, task_id: TaskId, cache: Arc<RamCache>, from_file: bool) {
if self if self
.rams .rams
.lock() .lock()
@ -183,10 +163,6 @@ impl CacheManager {
info!("{} old caches delete", task_id.brief()); info!("{} old caches delete", task_id.brief());
} }
self.update_from_file_once.lock().unwrap().remove(&task_id); self.update_from_file_once.lock().unwrap().remove(&task_id);
info!("{} ram cache updated", task_id.brief());
if !from_file {
self.update_file_cache(task_id, cache);
}
} }
} }
@ -212,25 +188,23 @@ mod test {
// cache not update // cache not update
for _ in 0..1000 { for _ in 0..1000 {
let task_id = TaskId::random(); let task_id = TaskId::random();
let mut cache = let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
RamCache::try_new(task_id.clone(), &CACHE_MANAGER, TEST_STRING_SIZE).unwrap();
cache.write_all(TEST_STRING.as_bytes()).unwrap(); cache.write_all(TEST_STRING.as_bytes()).unwrap();
} }
// cache update // cache update
for _ in 0..1000 { for _ in 0..1000 {
let task_id = TaskId::random(); let task_id = TaskId::random();
let mut cache = let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
RamCache::try_new(task_id.clone(), &CACHE_MANAGER, TEST_STRING_SIZE).unwrap();
cache.write_all(TEST_STRING.as_bytes()).unwrap(); cache.write_all(TEST_STRING.as_bytes()).unwrap();
CACHE_MANAGER.update_cache_inner(task_id, Arc::new(cache), true); CACHE_MANAGER.update_ram_cache(Arc::new(cache));
} }
// cache update and save to file // cache update and save to file
for _ in 0..1000 { for _ in 0..1000 {
let task_id = TaskId::random(); let task_id = TaskId::random();
let mut cache = let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
RamCache::try_new(task_id.clone(), &CACHE_MANAGER, TEST_STRING_SIZE).unwrap();
cache.write_all(TEST_STRING.as_bytes()).unwrap(); cache.write_all(TEST_STRING.as_bytes()).unwrap();
cache.finish_write(); cache.finish_write();
assert!(CACHE_MANAGER.rams.lock().unwrap().contains_key(&task_id)); assert!(CACHE_MANAGER.rams.lock().unwrap().contains_key(&task_id));
@ -248,12 +222,19 @@ mod test {
let mut v = vec![]; let mut v = vec![];
while total < TEST_SIZE { while total < TEST_SIZE {
let task_id = TaskId::random(); let task_id = TaskId::random();
v.push(RamCache::try_new(task_id.clone(), &CACHE_MANAGER, TEST_STRING_SIZE).unwrap()); v.push(RamCache::new(
task_id.clone(),
&CACHE_MANAGER,
Some(TEST_STRING_SIZE),
));
total += TEST_STRING_SIZE as u64; total += TEST_STRING_SIZE as u64;
} }
assert!(RamCache::try_new(TaskId::random(), &CACHE_MANAGER, TEST_STRING_SIZE).is_none()); assert_eq!(
RamCache::new(TaskId::random(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)).applied,
0
);
v.pop(); v.pop();
RamCache::try_new(TaskId::random(), &CACHE_MANAGER, TEST_STRING_SIZE).unwrap(); RamCache::new(TaskId::random(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
} }
#[test] #[test]
fn ut_cache_ram_drop() { fn ut_cache_ram_drop() {
@ -262,7 +243,7 @@ mod test {
CACHE_MANAGER.set_ram_cache_size(TEST_SIZE); CACHE_MANAGER.set_ram_cache_size(TEST_SIZE);
let task_id = TaskId::random(); let task_id = TaskId::random();
let cache = RamCache::try_new(task_id.clone(), &CACHE_MANAGER, TEST_STRING_SIZE).unwrap(); let cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
assert_eq!( assert_eq!(
CACHE_MANAGER.ram_handle.lock().unwrap().used_ram, CACHE_MANAGER.ram_handle.lock().unwrap().used_ram,
TEST_STRING_SIZE as u64 TEST_STRING_SIZE as u64
@ -276,27 +257,5 @@ mod test {
init(); init();
static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new); static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
CACHE_MANAGER.set_ram_cache_size(TEST_SIZE); CACHE_MANAGER.set_ram_cache_size(TEST_SIZE);
let task_id = TaskId::random();
let _cache = RamCache::try_new(task_id, &CACHE_MANAGER, TEST_STRING_SIZE).unwrap();
let task_id = TaskId::random();
let cache_temp = RamCache::temp(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
// temp cache do not apply or release ram size.
assert_eq!(
CACHE_MANAGER.ram_handle.lock().unwrap().used_ram,
TEST_STRING_SIZE as u64
);
drop(cache_temp);
assert_eq!(
CACHE_MANAGER.ram_handle.lock().unwrap().used_ram,
TEST_STRING_SIZE as u64
);
// temp cache do not update to cache manager.
let task_id = TaskId::random();
let cache_temp = RamCache::temp(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
cache_temp.finish_write();
assert!(!CACHE_MANAGER.rams.lock().unwrap().contains_key(&task_id));
} }
} }

View File

@ -131,8 +131,7 @@ mod test {
static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new); static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
// update cache // update cache
let mut cache = let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
RamCache::try_new(task_id.clone(), &CACHE_MANAGER, TEST_STRING_SIZE).unwrap();
cache.write_all(TEST_STRING.as_bytes()).unwrap(); cache.write_all(TEST_STRING.as_bytes()).unwrap();
cache.finish_write(); cache.finish_write();
thread::sleep(Duration::from_millis(100)); thread::sleep(Duration::from_millis(100));
@ -164,8 +163,7 @@ mod test {
let task_id = TaskId::random(); let task_id = TaskId::random();
static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new); static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
let mut cache = let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
RamCache::try_new(task_id.clone(), &CACHE_MANAGER, TEST_STRING_SIZE).unwrap();
cache.write_all(TEST_STRING.as_bytes()).unwrap(); cache.write_all(TEST_STRING.as_bytes()).unwrap();
cache.finish_write(); cache.finish_write();
@ -182,8 +180,7 @@ mod test {
let task_id = TaskId::random(); let task_id = TaskId::random();
static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new); static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
let mut cache = let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
RamCache::try_new(task_id.clone(), &CACHE_MANAGER, TEST_STRING_SIZE).unwrap();
cache.write_all(TEST_STRING.as_bytes()).unwrap(); cache.write_all(TEST_STRING.as_bytes()).unwrap();
cache.finish_write(); cache.finish_write();
@ -211,8 +208,7 @@ mod test {
let task_id = TaskId::random(); let task_id = TaskId::random();
static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new); static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
let mut cache = let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
RamCache::try_new(task_id.clone(), &CACHE_MANAGER, TEST_STRING_SIZE).unwrap();
cache.write_all(TEST_STRING.as_bytes()).unwrap(); cache.write_all(TEST_STRING.as_bytes()).unwrap();
cache.finish_write(); cache.finish_write();
thread::sleep(Duration::from_millis(100)); thread::sleep(Duration::from_millis(100));
@ -238,8 +234,7 @@ mod test {
let task_id = TaskId::random(); let task_id = TaskId::random();
static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new); static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
let mut cache = let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
RamCache::try_new(task_id.clone(), &CACHE_MANAGER, TEST_STRING_SIZE).unwrap();
cache.write_all(TEST_STRING.as_bytes()).unwrap(); cache.write_all(TEST_STRING.as_bytes()).unwrap();
cache.finish_write(); cache.finish_write();
@ -247,8 +242,7 @@ mod test {
let mut test_string = TEST_STRING.to_string(); let mut test_string = TEST_STRING.to_string();
test_string.push_str(TEST_STRING); test_string.push_str(TEST_STRING);
let mut cache = let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(test_string.len()));
RamCache::try_new(task_id.clone(), &CACHE_MANAGER, test_string.len()).unwrap();
cache.write_all(test_string.as_bytes()).unwrap(); cache.write_all(test_string.as_bytes()).unwrap();
cache.finish_write(); cache.finish_write();

View File

@ -11,13 +11,13 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::Mutex;
use crate::agent::{CustomCallback, DownloadRequest, TaskId}; use crate::agent::{CustomCallback, DownloadRequest, TaskId};
use crate::download::{download, TaskHandle}; use crate::download::{download, TaskHandle};
pub(crate) struct Updater { pub(crate) struct Updater {
pub(crate) handle: Mutex<TaskHandle>, pub(crate) remove_flag: bool,
pub(crate) seq: usize,
pub(crate) handle: TaskHandle,
} }
impl Updater { impl Updater {
@ -25,18 +25,29 @@ impl Updater {
task_id: TaskId, task_id: TaskId,
request: DownloadRequest, request: DownloadRequest,
callback: Box<dyn CustomCallback>, callback: Box<dyn CustomCallback>,
seq: usize,
) -> Self { ) -> Self {
let task_handle = download(task_id, request, Some(callback)); info!("new pre_download task {} seq {}", task_id.brief(), seq);
let task_handle = download(task_id, request, Some(callback), seq);
Self { Self {
handle: Mutex::new(task_handle), handle: task_handle,
remove_flag: false,
seq,
} }
} }
pub(crate) fn cancel(&self) { pub(crate) fn cancel(&self) {
self.handle.lock().unwrap().cancel(); self.handle.cancel();
} }
pub(crate) fn task_handle(&self) -> TaskHandle { pub(crate) fn task_handle(&self) -> TaskHandle {
self.handle.lock().unwrap().clone() self.handle.clone()
}
pub(crate) fn try_add_callback(
&mut self,
callback: Box<dyn CustomCallback>,
) -> Result<(), Box<dyn CustomCallback>> {
self.handle.try_add_callback(callback)
} }
} }

View File

@ -38,6 +38,7 @@ pub(crate) struct DownloadCallback {
state: Arc<AtomicUsize>, state: Arc<AtomicUsize>,
callbacks: Arc<Mutex<Vec<Box<dyn CustomCallback>>>>, callbacks: Arc<Mutex<Vec<Box<dyn CustomCallback>>>>,
processed: u64, processed: u64,
seq: usize,
} }
impl DownloadCallback { impl DownloadCallback {
@ -46,6 +47,7 @@ impl DownloadCallback {
finish: Arc<AtomicBool>, finish: Arc<AtomicBool>,
callbacks: Arc<Mutex<Vec<Box<dyn CustomCallback>>>>, callbacks: Arc<Mutex<Vec<Box<dyn CustomCallback>>>>,
state: Arc<AtomicUsize>, state: Arc<AtomicUsize>,
seq: usize,
) -> Self { ) -> Self {
Self { Self {
task_id, task_id,
@ -54,6 +56,7 @@ impl DownloadCallback {
finish, finish,
callbacks, callbacks,
processed: 0, processed: 0,
seq,
} }
} }
@ -69,7 +72,7 @@ impl DownloadCallback {
let cache = match self.cache.take() { let cache = match self.cache.take() {
Some(cache) => cache.finish_write(), Some(cache) => cache.finish_write(),
None => Arc::new(RamCache::temp( None => Arc::new(RamCache::new(
self.task_id.clone(), self.task_id.clone(),
CacheManager::get_instance(), CacheManager::get_instance(),
Some(0), Some(0),
@ -97,7 +100,7 @@ impl DownloadCallback {
} }
fn notify_agent_finish(&self) { fn notify_agent_finish(&self) {
DownloadAgent::get_instance().task_finish(&self.task_id); DownloadAgent::get_instance().task_finish(&self.task_id, self.seq);
} }
} }
@ -117,11 +120,16 @@ impl RequestCallback for DownloadCallback {
} }
fn on_fail(&mut self, error: HttpClientError) { fn on_fail(&mut self, error: HttpClientError) {
#[cfg(feature = "ohos")]
if *error.code() == HttpErrorCode::HttpWriteError {
self.on_cancel();
return;
}
self.on_fail_inner(error); self.on_fail_inner(error);
} }
fn on_cancel(&mut self) { fn on_cancel(&mut self) {
info!("{} cancel", self.task_id.brief()); info!("{} is cancel", self.task_id.brief());
self.state.store(CANCEL, Ordering::Release); self.state.store(CANCEL, Ordering::Release);
self.finish.store(true, Ordering::Release); self.finish.store(true, Ordering::Release);
@ -151,19 +159,9 @@ impl RequestCallback for DownloadCallback {
}; };
info!("{} content-length info {:?}", self.task_id.brief(), size); info!("{} content-length info {:?}", self.task_id.brief(), size);
let apply_cache = match size { let apply_cache =
Some(size) => { RamCache::new(self.task_id.clone(), CacheManager::get_instance(), size);
RamCache::try_new(self.task_id.clone(), CacheManager::get_instance(), size)
.unwrap_or_else(|| {
RamCache::temp(
self.task_id.clone(),
CacheManager::get_instance(),
Some(size),
)
})
}
None => RamCache::temp(self.task_id.clone(), CacheManager::get_instance(), None),
};
self.cache = Some(apply_cache) self.cache = Some(apply_cache)
} }
self.cache.as_mut().unwrap().write_all(data).unwrap(); self.cache.as_mut().unwrap().write_all(data).unwrap();
@ -201,8 +199,9 @@ impl TaskHandle {
callbacks: Arc::new(Mutex::new(vec![])), callbacks: Arc::new(Mutex::new(vec![])),
} }
} }
pub(crate) fn cancel(&mut self) { pub(crate) fn cancel(&self) {
if let Some(handle) = self.cancel_handle.take() { if let Some(handle) = self.cancel_handle.as_ref() {
info!("cancel task {}", self.task_id.brief());
handle.cancel(); handle.cancel();
} else { } else {
error!("cancel task {} not exist", self.task_id.brief()); error!("cancel task {} not exist", self.task_id.brief());
@ -265,6 +264,7 @@ pub(crate) fn download(
task_id: TaskId, task_id: TaskId,
request: DownloadRequest, request: DownloadRequest,
callback: Option<Box<dyn CustomCallback>>, callback: Option<Box<dyn CustomCallback>>,
seq: usize,
) -> TaskHandle { ) -> TaskHandle {
let mut handle = TaskHandle::new(task_id.clone()); let mut handle = TaskHandle::new(task_id.clone());
if let Some(callback) = callback { if let Some(callback) = callback {
@ -276,6 +276,7 @@ pub(crate) fn download(
handle.finish_flag(), handle.finish_flag(),
handle.callbacks(), handle.callbacks(),
handle.state_flag(), handle.state_flag(),
seq,
); );
let cancel_handle = DownloadTask::run(request, callback); let cancel_handle = DownloadTask::run(request, callback);
handle.set_cancel_handle(cancel_handle); handle.set_cancel_handle(cancel_handle);
@ -317,6 +318,7 @@ mod test {
Some(Box::new(TestCallback { Some(Box::new(TestCallback {
flag: success_flag.clone(), flag: success_flag.clone(),
})), })),
0,
); );
std::thread::sleep(Duration::from_secs(1)); std::thread::sleep(Duration::from_secs(1));
assert!(success_flag.load(Ordering::Acquire)); assert!(success_flag.load(Ordering::Acquire));
@ -355,7 +357,7 @@ mod test {
let server = test_server(test_f); let server = test_server(test_f);
let mut request = DownloadRequest::new(&server); let mut request = DownloadRequest::new(&server);
request.headers(headers); request.headers(headers);
download(TaskId::from_url(&server), request, None); download(TaskId::from_url(&server), request, None, 0);
std::thread::sleep(Duration::from_millis(2000)); std::thread::sleep(Duration::from_millis(2000));
assert!(flag.load(Ordering::SeqCst)); assert!(flag.load(Ordering::SeqCst));
} }

View File

@ -42,7 +42,7 @@ pub struct CancelHandle {
} }
impl CancelHandle { impl CancelHandle {
pub(crate) fn cancel(mut self) { pub(crate) fn cancel(&self) {
self.inner.cancel(); self.inner.cancel();
} }
} }

View File

@ -170,7 +170,7 @@ pub struct CancelHandle {
} }
impl CancelHandle { impl CancelHandle {
pub fn cancel(self) { pub fn cancel(&self) {
self.inner.store(true, Ordering::Release); self.inner.store(true, Ordering::Release);
} }
} }

View File

@ -12,6 +12,7 @@
# limitations under the License. # limitations under the License.
import("//build/ohos.gni") import("//build/ohos.gni")
import("//build/test.gni")
rust_cxx("netstack_rs_cxx_gen") { rust_cxx("netstack_rs_cxx_gen") {
sources = [ "src/wrapper.rs" ] sources = [ "src/wrapper.rs" ]
@ -58,7 +59,19 @@ ohos_rust_static_library("netstack_rs") {
sources = [ "src/lib.rs" ] sources = [ "src/lib.rs" ]
external_deps = [] deps = [
":netstack_rs_cxx",
"//third_party/rust/crates/cxx:lib",
]
subsystem_name = "request"
part_name = "request"
}
ohos_rust_unittest("rust_netstack_rs_ut_test") {
module_out_path = "request/netstack_rs"
sources = [ "src/lib.rs" ]
deps = [ deps = [
":netstack_rs_cxx", ":netstack_rs_cxx",
@ -68,3 +81,8 @@ ohos_rust_static_library("netstack_rs") {
subsystem_name = "request" subsystem_name = "request"
part_name = "request" part_name = "request"
} }
group("unittest") {
testonly = true
deps = [ ":rust_netstack_rs_ut_test" ]
}

View File

@ -58,7 +58,7 @@ impl Debug for HttpClientError {
} }
} }
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone, PartialEq, Eq)]
pub enum HttpErrorCode { pub enum HttpErrorCode {
HttpNoneErr, HttpNoneErr,
HttpPermissionDeniedCode = 201, HttpPermissionDeniedCode = 201,

View File

@ -28,6 +28,7 @@ pub struct RequestTask {
} }
unsafe impl Send for RequestTask {} unsafe impl Send for RequestTask {}
unsafe impl Sync for RequestTask {}
/// RequestTask status /// RequestTask status
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -56,8 +57,8 @@ impl RequestTask {
} }
/// cancel the request task /// cancel the request task
pub fn cancel(&mut self) { pub fn cancel(&self) {
self.pin_mut().Cancel() self.pin_mut().Cancel();
} }
/// get the request task status /// get the request task status
@ -84,7 +85,7 @@ impl RequestTask {
); );
} }
fn pin_mut(&mut self) -> Pin<&mut HttpClientTask> { fn pin_mut(&self) -> Pin<&mut HttpClientTask> {
let ptr = self.inner.as_ref().unwrap() as *const HttpClientTask as *mut HttpClientTask; let ptr = self.inner.as_ref().unwrap() as *const HttpClientTask as *mut HttpClientTask;
unsafe { Pin::new_unchecked(ptr.as_mut().unwrap()) } unsafe { Pin::new_unchecked(ptr.as_mut().unwrap()) }
} }

View File

@ -1,55 +0,0 @@
# Copyright (c) 2024 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import("//base/request/request/request_aafwk.gni")
import("//build/test.gni")
ohos_rust_unittest("rust_netstack_rs_ut_test") {
module_out_path = "request/netstack_rs"
sources = [ "../src/lib.rs" ]
external_deps = []
deps = [
"../../../request_utils:request_utils",
"../../netstack_rs:netstack_rs_cxx",
"//third_party/rust/crates/cxx:lib",
]
external_deps = [ "hilog:hilog_rust" ]
subsystem_name = "request"
part_name = "request"
}
ohos_rust_unittest("rust_netstack_rs_sdv_test") {
module_out_path = "request/netstack_rs"
sources = [ "download.rs" ]
deps = [ "../../netstack_rs:netstack_rs" ]
subsystem_name = "request"
part_name = "request"
}
group("unittest") {
testonly = true
deps = []
if (!use_clang_coverage) {
deps = [
":rust_netstack_rs_sdv_test",
":rust_netstack_rs_ut_test",
]
}
}

View File

@ -1,37 +0,0 @@
// Copyright (C) 2024 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unused)]
use std::time::Duration;
use netstack_rs::request::{Request, RequestCallback};
use netstack_rs::response::Response;
struct Callback {}
impl RequestCallback for Callback {
fn on_fail(&mut self, error: netstack_rs::error::HttpClientError) {}
}
#[test]
fn download() {
let mut request = Request::new();
request
.url("http://192.168.0.101/bind.png")
.method("GET")
.callback(Callback {});
let mut task = request.build();
task.start();
let status = task.status();
println!("{:?}", status);
}

View File

@ -90,3 +90,8 @@ ohos_rust_unittest("rust_request_utils_ut_test") {
subsystem_name = "request" subsystem_name = "request"
part_name = "request" part_name = "request"
} }
group("unittest") {
testonly = true
deps = [ ":rust_request_utils_ut_test" ]
}