预加载cancel逻辑调整

Signed-off-by: fqwert <yanglv2@huawei.com>
Change-Id: I2e45fffee5953337a66c4d279c9525c925e21c69
This commit is contained in:
fqwert 2024-11-16 16:04:30 +08:00
parent cbd53e09d6
commit 85dc2c9444
6 changed files with 62 additions and 25 deletions

View File

@ -368,7 +368,7 @@ mod test {
let callback = Box::new(TestCallbackC {
flag: cancel_flag.clone(),
});
let handle = agent.preload(DownloadRequest::new(TEST_URL), callback, true);
let mut handle = agent.preload(DownloadRequest::new(TEST_URL), callback, true);
handle.cancel();
std::thread::sleep(Duration::from_secs(1));
assert_eq!(cancel_flag.load(Ordering::SeqCst), 1);
@ -401,9 +401,13 @@ mod test {
flag: cancel_flag_1.clone(),
});
let handle_0 = agent.preload(DownloadRequest::new(TEST_URL), callback_0, false);
agent.preload(DownloadRequest::new(TEST_URL), callback_1, false);
let mut handle_0 = agent.preload(DownloadRequest::new(TEST_URL), callback_0, false);
let mut handle_1 = agent.preload(DownloadRequest::new(TEST_URL), callback_1, false);
handle_0.cancel();
handle_0.cancel();
assert_eq!(cancel_flag_0.load(Ordering::SeqCst), 0);
assert_eq!(cancel_flag_1.load(Ordering::SeqCst), 0);
handle_1.cancel();
std::thread::sleep(Duration::from_secs(1));
assert_eq!(cancel_flag_0.load(Ordering::SeqCst), 1);
assert_eq!(cancel_flag_1.load(Ordering::SeqCst), 1);

View File

@ -36,7 +36,7 @@ impl Updater {
}
}
pub(crate) fn cancel(&self) {
pub(crate) fn cancel(&mut self) {
self.handle.cancel();
}

View File

@ -19,8 +19,7 @@ cfg_ohos! {
use netstack_rs::error::HttpErrorCode;
}
pub(crate) use super::CancelHandle;
use super::{DownloadTask, HttpClientError, RequestCallback, RequestTask, Response};
use super::{CancelHandle, DownloadTask, HttpClientError, RequestCallback, RequestTask, Response};
use crate::agent::{CustomCallback, DownloadRequest, TaskId};
use crate::cache::{CacheManager, RamCache};
use crate::{DownloadAgent, DownloadError};
@ -68,7 +67,6 @@ impl DownloadCallback {
impl DownloadCallback {
fn on_success_inner(&mut self) {
info!("{} success", self.task_id.brief());
self.state.store(SUCCESS, Ordering::Release);
let cache = match self.cache.take() {
Some(cache) => cache.finish_write(),
@ -78,6 +76,7 @@ impl DownloadCallback {
Some(0),
)),
};
self.state.store(SUCCESS, Ordering::Release);
self.finish.store(true, Ordering::Release);
let mut callbacks = self.callbacks.lock().unwrap();
while let Some(mut callback) = callbacks.pop() {
@ -199,8 +198,8 @@ impl TaskHandle {
callbacks: Arc::new(Mutex::new(vec![])),
}
}
pub(crate) fn cancel(&self) {
if let Some(handle) = self.cancel_handle.as_ref() {
pub(crate) fn cancel(&mut self) {
if let Some(handle) = self.cancel_handle.take() {
info!("cancel task {}", self.task_id.brief());
handle.cancel();
} else {
@ -233,6 +232,9 @@ impl TaskHandle {
if !self.finish.load(Ordering::Acquire) {
info!("add callback to task {}", self.task_id.brief());
callbacks.push(callback);
if let Some(handle) = self.cancel_handle.as_ref() {
handle.add_count();
}
Ok(())
} else {
Err(callback)

View File

@ -17,7 +17,7 @@ mod error;
cfg_ohos! {
mod netstack;
use netstack_rs::task::RequestTask;
pub use netstack::CancelHandle;
use netstack::CancelHandle;
use netstack::DownloadTask;
use netstack_rs::request::RequestCallback;
use netstack_rs::response::Response;
@ -26,7 +26,7 @@ cfg_ohos! {
cfg_not_ohos! {
mod ylong;
pub use ylong::CancelHandle;
use ylong::CancelHandle;
pub use ylong::RequestTask;
use ylong::DownloadTask;
use ylong::RequestCallback;

View File

@ -11,6 +11,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use netstack_rs::request::Request;
use netstack_rs::task::RequestTask;
@ -20,7 +23,7 @@ use crate::agent::DownloadRequest;
pub(crate) struct DownloadTask;
impl DownloadTask {
pub(crate) fn run(input: DownloadRequest, callback: DownloadCallback) -> CancelHandle {
pub(super) fn run(input: DownloadRequest, callback: DownloadCallback) -> CancelHandle {
let mut request = Request::new();
request.url(input.url);
if let Some(headers) = input.headers {
@ -32,17 +35,33 @@ impl DownloadTask {
request.callback(callback);
let mut task = request.build();
task.start();
CancelHandle { inner: task }
CancelHandle::new(task)
}
}
#[derive(Clone)]
pub struct CancelHandle {
inner: RequestTask,
count: Arc<AtomicUsize>,
}
impl CancelHandle {
pub(crate) fn cancel(&self) {
self.inner.cancel();
fn new(inner: RequestTask) -> Self {
Self {
inner,
count: Arc::new(AtomicUsize::new(1)),
}
}
pub(super) fn add_count(&self) {
self.count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}
impl CancelHandle {
pub(super) fn cancel(&self) {
if self.count.fetch_sub(1, std::sync::atomic::Ordering::SeqCst) == 1 {
self.inner.cancel();
}
}
}

View File

@ -15,7 +15,7 @@ mod client;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
@ -80,21 +80,17 @@ impl<'a> RequestTask<'a> {
pub struct DownloadTask;
impl DownloadTask {
pub(crate) fn run(request: DownloadRequest, mut callback: DownloadCallback) -> CancelHandle {
pub(super) fn run(request: DownloadRequest, mut callback: DownloadCallback) -> CancelHandle {
let url = match PercentEncoder::encode(request.url) {
Ok(url) => url,
Err(e) => {
callback.on_fail(e);
return CancelHandle {
inner: Arc::new(AtomicBool::new(false)),
};
return CancelHandle::new(Arc::new(AtomicBool::new(false)));
}
};
callback.set_running();
let flag = Arc::new(AtomicBool::new(false));
let handle = CancelHandle {
inner: flag.clone(),
};
let handle = CancelHandle::new(flag.clone());
let mut headers = None;
if let Some(h) = request.headers {
headers = Some(
@ -167,11 +163,27 @@ impl Response {
#[derive(Clone)]
pub struct CancelHandle {
inner: Arc<AtomicBool>,
count: Arc<AtomicUsize>,
}
impl CancelHandle {
pub fn cancel(&self) {
self.inner.store(true, Ordering::Release);
fn new(inner: Arc<AtomicBool>) -> Self {
Self {
inner,
count: Arc::new(AtomicUsize::new(1)),
}
}
pub(super) fn add_count(&self) {
self.count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}
impl CancelHandle {
pub(super) fn cancel(&self) {
if self.count.fetch_sub(1, std::sync::atomic::Ordering::SeqCst) == 1 {
self.inner.store(true, Ordering::Release);
}
}
}