bugfix unload sa

Signed-off-by: qimeng <qimeng11@huawei.com>
This commit is contained in:
qimeng 2023-08-02 09:46:47 +08:00
parent 28f95ad7ad
commit d980939e72
2 changed files with 67 additions and 77 deletions

View File

@ -28,7 +28,8 @@ use ylong_runtime::{builder::RuntimeBuilder, executor::Runtime, join_handle::Joi
static MAX_TASK_COUNT: u32 = 300;
static MAX_TASK_COUNT_EACH_APP: u8 = 10;
static MAX_RUNNING_TASK_COUNT_EACH_APP: u8 = 5;
static MAX_RUNNING_TASK_COUNT_EACH_APP: u32 = 5; // api10
static MAX_RUNNING_TASK_COUNT_API9: u32 = 4;
static INTERVAL_MILLISECONDS: u64 = 30 * 60 * 1000;
static MILLISECONDS_IN_ONE_DAY: u64 = 24 * 60 * 60 * 1000;
static MILLISECONDS_IN_ONE_MONTH: u64 = 30 * 24 * 60 * 60 * 1000;
@ -48,7 +49,6 @@ pub struct TaskManager {
pub rt: Runtime,
pub front_notify_time: u64,
pub unloading: AtomicBool,
pub total_task_count: AtomicU32,
pub api10_background_task_count: AtomicU32,
pub recording_rdb_num: AtomicU32,
task_handles: Mutex<HashMap<u32, JoinHandle<()>>>,
@ -111,7 +111,6 @@ impl TaskManager {
.unwrap(),
front_notify_time: get_current_timestamp(),
unloading: AtomicBool::new(false),
total_task_count: AtomicU32::new(0),
api10_background_task_count: AtomicU32::new(0),
recording_rdb_num: AtomicU32::new(0),
task_handles: Mutex::new(HashMap::<u32, JoinHandle<()>>::new()),
@ -132,13 +131,12 @@ impl TaskManager {
self.rt.spawn(async {
loop {
let task_manager = TaskManager::get_instance();
let total_task_count = task_manager.total_task_count.load(Ordering::SeqCst);
let api10_background_task_count = task_manager.api10_background_task_count.load(Ordering::SeqCst);
let recording_rdb_num = task_manager.recording_rdb_num.load(Ordering::SeqCst);
let unloading = task_manager.unloading.load(Ordering::SeqCst);
info!(LOG_LABEL, "dump all task info, total_task_count:{}, api10_background_task_count:{},
recording_rdb_num:{}, unloading flag:{}", @public(total_task_count),
@public(api10_background_task_count), @public(recording_rdb_num),
info!(LOG_LABEL, "dump all task info, api10_background_task_count:{},
recording_rdb_num:{}, unloading flag:{}", @public(api10_background_task_count),
@public(recording_rdb_num),
@public(unloading));
{
let guard = task_manager.task_map.lock().unwrap();
@ -173,12 +171,18 @@ impl TaskManager {
}
let mut guard = self.task_map.lock().unwrap();
guard.clear();
self.total_task_count.store(0, Ordering::SeqCst);
self.api10_background_task_count.store(0, Ordering::SeqCst);
}
pub fn get_total_task_count(&self) -> u32 {
self.total_task_count.load(Ordering::SeqCst)
pub fn get_total_task_count(&self, guard: &MutexGuard<HashMap<u64, AppTask>>) -> u32 {
let mut total_task_count: u32 = 0;
if self.global_front_task.is_some() {
total_task_count += 1;
}
for (_, app_task) in guard.iter() {
total_task_count += app_task.len() as u32;
}
total_task_count
}
pub fn get_api10_background_task_count(&self) -> u32 {
@ -256,7 +260,6 @@ impl TaskManager {
}
if self.global_front_task.is_none() {
self.global_front_task = Some(Arc::new(task));
self.total_task_count.fetch_add(1, Ordering::SeqCst);
return ErrorCode::ErrOk;
}
self.global_front_task
@ -272,14 +275,12 @@ impl TaskManager {
if !self.add_task(uid, *task_id, Arc::new(task), &mut task_map_guard) {
return ErrorCode::TaskEnqueueErr;
}
self.total_task_count.fetch_add(1, Ordering::SeqCst);
self.api10_background_task_count
.fetch_add(1, Ordering::SeqCst);
return ErrorCode::ErrOk;
}
Version::API9 => {
self.add_task_api9(uid, *task_id, Arc::new(task), &mut task_map_guard);
self.total_task_count.fetch_add(1, Ordering::SeqCst);
return ErrorCode::ErrOk;
}
}
@ -384,37 +385,39 @@ impl TaskManager {
}
}
fn get_running_background_task_count(
fn reach_maximum_running_limit(
&self,
uid: u64,
version: Version,
limit: u32,
guard: &MutexGuard<HashMap<u64, AppTask>>,
) -> u8 {
let app_task = guard.get(&uid);
) -> bool {
let mut count = 0;
if app_task.is_none() {
debug!(LOG_LABEL, "the Application has not any background task");
return count;
}
for (_, task) in app_task.unwrap().iter() {
let state = task.status.lock().unwrap().state;
if task.conf.version == Version::API10
&& (state == State::RETRYING || state == State::RUNNING)
{
count += 1;
for (id, app_task) in guard.iter() {
if version == Version::API10 && uid != *id {
continue;
}
for (_, task) in app_task.iter() {
if task.conf.version != version {
continue;
}
let state = task.status.lock().unwrap().state;
if state == State::RETRYING || state == State::RUNNING {
count += 1;
}
if count >= limit {
return true;
}
}
}
debug!(LOG_LABEL,
"the running background task which belongs to the app is {}",
@public(count)
);
count
false
}
fn start_common(
&self,
uid: u64,
task: Arc<RequestTask>,
guard: &MutexGuard<HashMap<u64, AppTask>>,
guard: MutexGuard<HashMap<u64, AppTask>>,
) {
if !task.net_work_online() || !task.check_net_work_status() {
error!(LOG_LABEL, "check net work failed");
@ -424,10 +427,13 @@ impl TaskManager {
if state != State::INITIALIZED && state != State::WAITING && state != State::PAUSED {
return;
}
let vesion = task.conf.version;
if vesion == Version::API10 && task.conf.common_data.mode == Mode::BACKGROUND {
let running_task_count = self.get_running_background_task_count(uid, guard);
if running_task_count >= MAX_RUNNING_TASK_COUNT_EACH_APP {
if task.conf.common_data.mode == Mode::BACKGROUND {
let limit = if task.conf.version == Version::API10 {
MAX_RUNNING_TASK_COUNT_EACH_APP
} else {
MAX_RUNNING_TASK_COUNT_API9
};
if self.reach_maximum_running_limit(uid, task.conf.version, limit, &guard) {
info!(LOG_LABEL, "too many task in running state");
task.set_status(State::WAITING, Reason::RunningTaskMeetLimits);
return;
@ -460,7 +466,7 @@ impl TaskManager {
&self,
uid: u64,
task: Arc<RequestTask>,
guard: &mut MutexGuard<HashMap<u64, AppTask>>,
guard: MutexGuard<HashMap<u64, AppTask>>,
) {
self.start_common(uid, task.clone(), guard);
Self::get_instance().after_task_processed(&task);
@ -468,7 +474,7 @@ impl TaskManager {
pub fn start(&mut self, uid: u64, task_id: u32) -> ErrorCode {
info!(LOG_LABEL, "start a task, which task id is {}", @public(task_id));
let mut task_map_guard = self.task_map.lock().unwrap();
let task_map_guard = self.task_map.lock().unwrap();
let task = self.get_task(uid, task_id, &task_map_guard);
if let Some(task) = task {
let task_state = task.status.lock().unwrap().state;
@ -476,27 +482,33 @@ impl TaskManager {
error!(LOG_LABEL, "can not start a task which state is {}", @public(task_state as u32));
return ErrorCode::TaskStateErr;
}
self.start_inner(uid, task.clone(), &mut task_map_guard);
self.start_inner(uid, task.clone(), task_map_guard);
return ErrorCode::ErrOk;
}
error!(LOG_LABEL, "task not found");
ErrorCode::TaskStateErr
}
fn process_app_waitting_task(&self, uid: u64, guard: &MutexGuard<HashMap<u64, AppTask>>) {
let app_task = guard.get(&uid);
if let Some(app_task) = app_task {
for (_, request_task) in app_task.iter() {
let state = request_task.status.lock().unwrap().state;
fn process_waitting_task(&self, uid: u64, version: Version, guard: &MutexGuard<HashMap<u64, AppTask>>) {
for (id, app_task) in guard.iter() {
if version == Version::API10 && uid != *id {
continue;
}
for (_, task) in app_task.iter() {
if version != task.conf.version {
continue;
}
let state = task.status.lock().unwrap().state;
if state == State::WAITING {
debug!(LOG_LABEL, "begin process the task which in waitting state");
let task = request_task.clone();
let task = task.clone();
self.rt.spawn(async move {
let manager = TaskManager::get_instance();
let mut task_map_guard = manager.task_map.lock().unwrap();
manager.start_inner(uid, task, &mut task_map_guard);
let task_map_guard = manager.task_map.lock().unwrap();
manager.start_inner(uid, task, task_map_guard);
});
}
return;
}
}
}
@ -528,7 +540,7 @@ impl TaskManager {
pub fn resume(&self, uid: u64, task_id: u32) -> ErrorCode {
debug!(LOG_LABEL, "resume a task");
let mut task_map_guard = self.task_map.lock().unwrap();
let task_map_guard = self.task_map.lock().unwrap();
let task = self.get_task(uid, task_id, &task_map_guard);
if let Some(task) = task {
if task.conf.common_data.mode == Mode::FRONTEND {
@ -541,7 +553,7 @@ impl TaskManager {
return ErrorCode::TaskStateErr;
}
error!(LOG_LABEL, "resume the task success");
self.start_inner(uid, task.clone(), &mut task_map_guard);
self.start_inner(uid, task.clone(), task_map_guard);
return ErrorCode::ErrOk;
}
error!(LOG_LABEL, "task not found");
@ -719,8 +731,8 @@ pub async fn unload_sa() {
let task_manager = TaskManager::get_instance();
info!(LOG_LABEL, "unload SA end sleep");
match task_manager.task_map.try_lock() {
Ok(_) => {
let total_task_count = task_manager.total_task_count.load(Ordering::SeqCst);
Ok(guard) => {
let total_task_count = task_manager.get_total_task_count(&guard);
let recording_rdb_num = task_manager.recording_rdb_num.load(Ordering::SeqCst);
if total_task_count != 0 || recording_rdb_num != 0 {
info!(LOG_LABEL, "total_task_count is {}, recording_rdb_num is {}",
@ -756,7 +768,6 @@ async fn remove_task_from_map(task: Arc<RequestTask>) {
if let Some(v) = &task_manager.global_front_task {
if task.task_id == v.task_id {
task_manager.global_front_task.take();
task_manager.total_task_count.fetch_sub(1, Ordering::SeqCst);
return;
}
}
@ -778,7 +789,6 @@ async fn remove_task_from_map(task: Arc<RequestTask>) {
return;
}
let remove_task = remove_task.unwrap();
task_manager.total_task_count.fetch_sub(1, Ordering::SeqCst);
if remove_task.conf.version == Version::API9 {
let notify_data = remove_task.build_notify_data();
TaskManager::get_instance().front_notify("remove".into(), &notify_data);
@ -787,11 +797,8 @@ async fn remove_task_from_map(task: Arc<RequestTask>) {
}
if app_task.len() == 0 {
guard.remove(&remove_task.uid);
return;
}
if remove_task.conf.version == Version::API10 {
task_manager.process_app_waitting_task(remove_task.uid, &mut guard);
}
task_manager.process_waitting_task(remove_task.uid, remove_task.conf.version, &guard);
}
pub fn monitor_network() {
@ -845,8 +852,8 @@ extern "C" fn net_work_change_callback() {
task_manager.rt.spawn(async move {
sleep(Duration::from_secs(WAITTING_RETRY_INTERVAL)).await;
let manager = TaskManager::get_instance();
let mut guard = manager.task_map.lock().unwrap();
manager.start_inner(uid, task, &mut guard);
let guard = manager.task_map.lock().unwrap();
manager.start_inner(uid, task, guard);
});
}
}
@ -879,7 +886,7 @@ extern "C" fn update_app_state(uid: i32, state: i32) {
if uid as u64 != task_manager.global_front_task.as_ref().unwrap().uid {
return;
}
task_manager.global_front_task.as_ref().unwrap().set_status(State::STOPPED, Reason::AppBackgroundOrTerminate);
task_manager.global_front_task.take().unwrap().set_status(State::STOPPED, Reason::AppBackgroundOrTerminate);
}
}

View File

@ -47,10 +47,6 @@ fn create_test1() {
Version::API10,
);
assert_eq!(code, ErrorCode::TaskEnqueueErr);
assert_eq!(
task_manager.get_total_task_count(),
MAX_TASK_COUNT_EACH_APP_API10
);
assert_eq!(
task_manager.get_api10_background_task_count(),
MAX_TASK_COUNT_EACH_APP_API10
@ -62,7 +58,6 @@ fn create_test1() {
fn create_test2() {
let task_manager = TaskManager::get_instance();
task_manager.clear_all_task();
assert_eq!(task_manager.get_total_task_count(), 0);
assert_eq!(task_manager.get_api10_background_task_count(), 0);
let mut task_id: u32 = 0;
let mut paths: Vec<String> = Vec::new();
@ -78,7 +73,6 @@ fn create_test2() {
paths.push(file_name);
assert_eq!(code, ErrorCode::ErrOk);
}
assert_eq!(task_manager.get_total_task_count(), MAX_TASK_COUNT_API10);
assert_eq!(
task_manager.get_api10_background_task_count(),
MAX_TASK_COUNT_API10
@ -91,7 +85,6 @@ fn create_test2() {
Version::API10,
);
assert_eq!(code, ErrorCode::TaskEnqueueErr);
assert_eq!(task_manager.get_total_task_count(), MAX_TASK_COUNT_API10);
assert_eq!(
task_manager.get_api10_background_task_count(),
MAX_TASK_COUNT_API10
@ -118,7 +111,6 @@ fn create_test3() {
paths.push(file_name);
assert_eq!(code, ErrorCode::ErrOk);
}
assert_eq!(task_manager.get_total_task_count(), 10);
assert_eq!(
task_manager.get_api10_background_task_count(),
MAX_TASK_COUNT_EACH_APP_API10
@ -131,7 +123,6 @@ fn create_test3() {
Version::API10,
);
assert_eq!(code, ErrorCode::TaskModeErr);
assert_eq!(task_manager.get_total_task_count(), 10);
assert_eq!(
task_manager.get_api10_background_task_count(),
MAX_TASK_COUNT_EACH_APP_API10
@ -144,7 +135,6 @@ fn create_test3() {
Version::API9,
);
assert_eq!(code, ErrorCode::ErrOk);
assert_eq!(task_manager.get_total_task_count(), 11);
assert_eq!(
task_manager.get_api10_background_task_count(),
MAX_TASK_COUNT_EACH_APP_API10
@ -166,7 +156,6 @@ fn start_test1() {
Version::API10,
);
assert_eq!(code, ErrorCode::ErrOk);
assert_eq!(task_manager.get_total_task_count(), 1);
assert_eq!(task_manager.get_api10_background_task_count(), 1);
code = task_manager.start(uid, task_id);
assert_eq!(code, ErrorCode::ErrOk);
@ -240,7 +229,6 @@ fn stop_test() {
task_manager.start(uid, task_id);
code = task_manager.stop(uid, task_id);
assert_eq!(code, ErrorCode::ErrOk);
assert_eq!(task_manager.get_total_task_count(), 0);
assert_eq!(task_manager.get_api10_background_task_count(), 0);
code = common::construct_download_task(
&mut task_id,
@ -250,11 +238,9 @@ fn stop_test() {
Version::API9,
);
assert_eq!(code, ErrorCode::ErrOk);
assert_eq!(task_manager.get_total_task_count(), 1);
assert_eq!(task_manager.get_api10_background_task_count(), 0);
task_manager.start(uid, task_id);
code = task_manager.stop(uid, task_id);
assert_eq!(task_manager.get_total_task_count(), 0);
assert_eq!(task_manager.get_api10_background_task_count(), 0);
}
@ -281,12 +267,9 @@ fn remove_test() {
Version::API9,
);
assert_eq!(code, ErrorCode::ErrOk);
assert_eq!(task_manager.get_total_task_count(), 2);
assert_eq!(task_manager.get_api10_background_task_count(), 1);
code = task_manager.remove(uid, task_id1);
assert_eq!(task_manager.get_total_task_count(), 1);
assert_eq!(task_manager.get_api10_background_task_count(), 0);
code = task_manager.remove(uid, task_id2);
assert_eq!(task_manager.get_total_task_count(), 0);
assert_eq!(task_manager.get_api10_background_task_count(), 0);
}