asynchronous task scheduling

Signed-off-by: hhh2 <pengfei.hou@huawei.com>
Change-Id: I937e4355f95f0e85ea47451f081ff4ce30bd4ae2
Signed-off-by: hhh2 <pengfei.hou@huawei.com>
This commit is contained in:
hhh2 2023-09-18 01:04:53 +00:00
parent f8b95f8b38
commit 4c8233762a
10 changed files with 1159 additions and 1 deletions

View File

@ -29,6 +29,7 @@ group("devicestatus_service_group") {
group("device_status_tests") {
testonly = true
deps = [
"${device_status_root_path}/rust/modules/scheduler/test:fusion_scheduler_test",
"frameworks/native/interaction/test:interaction_manager_test",
"services/native/test:devicestatussrv_test",
"test/fuzztest:device_status_fuzztest",

View File

@ -42,7 +42,8 @@
"dsoftbus",
"common",
"motion",
"build_framework"
"build_framework",
"ylong_runtime"
],
"third_party": [
"cJSON",

View File

@ -18,6 +18,7 @@
use std::ffi::NulError;
/// Error codes.
#[derive(Debug)]
#[repr(i32)]
pub enum FusionErrorCode {
/// Operation failed.

View File

@ -0,0 +1,35 @@
# Copyright (C) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import("//base/msdp/device_status/device_status.gni")
ohos_rust_shared_library("fusion_scheduler_rust") {
sources = [ "src/lib.rs" ]
deps = [
"${device_status_root_path}/rust/data/sys:fusion_data_rust",
"${device_status_root_path}/rust/utils:fusion_utils_rust",
"//third_party/rust/crates/libc:lib",
]
external_deps = [
"hilog:hilog_rust",
"ylong_runtime:ylong_runtime",
]
crate_name = "fusion_scheduler_rust"
crate_type = "dylib"
subsystem_name = "${device_status_subsystem_name}"
part_name = "${device_status_part_name}"
}

View File

@ -0,0 +1,231 @@
/*
* Copyright (C) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//! Providing asynchronous task scheduling and epoll handling mechanism.
//!
//! Current implementation of task scheduler allow posted tasks to run concurrently,
//! so synchronization are necessary if they share some data.
//!
//! On creation of scheduler, an epoll instance is also created and event loop is
//! started. We see a (fd, events processing logic) pair as an event handler. When
//! an epoll handler is added, its fd is added to the interest list of the epoll
//! and is waited on for events. When events occured on its fd, scheduler will
//! dispatch events to it.
#![allow(dead_code)]
#![allow(unused_variables)]
/// Module declarations.
mod scheduler;
mod task;
/// Public exports.
pub use scheduler::IEpollHandler;
pub use scheduler::{
LIBC_EPOLLIN,
LIBC_EPOLLERR,
LIBC_EPOLLHUP,
};
pub use task::TaskHandle;
use std::ffi::{ c_char, CString };
use std::sync::Arc;
use std::sync::atomic::{ AtomicUsize, Ordering };
use std::time::Duration;
use fusion_data_rust::FusionInteractionResult;
use fusion_utils_rust::call_debug_enter;
use hilog_rust::{ hilog, HiLogLabel, LogType };
use scheduler::Scheduler;
const LOG_LABEL: HiLogLabel = HiLogLabel {
log_type: LogType::LogCore,
domain: 0xD002220,
tag: "Handler",
};
/// Front-end of scheduler, providing interface for posting asynchronous task
/// and epoll handling.
pub struct Handler {
id: usize,
scheduler: Arc<Scheduler>,
}
impl Handler {
/// Construct a new instance of `Handler`.
pub fn new() -> Self
{
static ID_RADIX: AtomicUsize = AtomicUsize::new(1);
let scheduler = Arc::new(Scheduler::new());
Self {
id: ID_RADIX.fetch_add(1, Ordering::Relaxed),
scheduler,
}
}
/// Return the unique identifier of this `Handler`.
pub fn id(&self) -> usize
{
self.id
}
/// Schedudle a `synchronous` executing task, and return the result.
pub fn post_sync_task<F, R>(&self, task: F) -> R
where
F: Fn() -> R + Send + 'static,
R: Send + 'static,
{
call_debug_enter!("Handler::post_sync_task");
ylong_runtime::block_on(async move {
task()
})
}
/// Scheduling an asynchronous task.
///
/// Calling `TaskHandle::result` to get the result of the task. Calling
/// `TaskHandle::result` will block current thread until the task finish.
///
/// Calling `TaskHandle::cancel` to cancel the posted task before it finish.
///
/// # Examples
///
/// ```
/// let handler = Handler::new();
/// let param: usize = 0xAB1807;
///
/// let mut task_handle = handler.post_async_task(move || {
/// hash(param)
/// }
/// let ret = task_handle.result().unwrap();
/// let expected = hash(param);
/// assert_eq!(ret, expected);
/// ```
///
pub fn post_async_task<F, R>(&self, task: F) -> TaskHandle<R>
where
F: Fn() -> R + Send + 'static,
R: Send + 'static,
{
call_debug_enter!("Handler::post_async_task");
let handle = ylong_runtime::spawn(async move {
task()
});
TaskHandle::from(handle)
}
/// Schedule an asynchronous task that will run after a period of `delay`.
///
/// Calling `TaskHandle::cancel` to cancel the posted task before it finish.
///
pub fn post_delayed_task<F, R>(&self, task: F, delay: Duration) -> TaskHandle<R>
where
F: Fn() -> R + Send + 'static,
R: Send + 'static,
{
call_debug_enter!("Handler::post_delayed_task");
let handle = ylong_runtime::spawn(async move {
ylong_runtime::time::sleep(delay).await;
task()
});
TaskHandle::from(handle)
}
/// Schedule an asynchronous task that will run repeatedly with set interval
/// after a period of time.
///
/// The posted task will start to run after a period of `delay` if `delay` is not None.
/// It will repeat for `repeat` times with `interval` between each running. If `repeat`
/// is None, the posted task will repeat forever.
///
/// Calling `TaskHandle::cancel` to cancel the posted task before it finish.
///
pub fn post_perioric_task<F>(&self, task: F, delay: Option<Duration>, interval: Duration,
repeat: Option<usize>) -> TaskHandle<()>
where
F: Fn() + Send + 'static
{
call_debug_enter!("Handler::post_perioric_task");
let handle = ylong_runtime::spawn(async move {
if let Some(d) = delay {
ylong_runtime::time::sleep(d).await;
}
ylong_runtime::time::periodic_schedule(task, repeat, interval).await;
});
TaskHandle::from(handle)
}
/// Schedule an asynchronous task that may block. That is, it may take a huge time to
/// finish, or may block for resources.
///
/// Calling `TaskHandle::cancel` to cancel the posted task before it finish.
///
pub fn post_blocking_task<F, R>(&self, task: F) -> TaskHandle<R>
where
F: Fn() -> R + Send + 'static,
R: Send + 'static,
{
call_debug_enter!("Handler::post_delayed_task");
let handle = ylong_runtime::spawn_blocking(task);
TaskHandle::from(handle)
}
/// Add an epoll handler to epoll event loop.
///
/// Note that we call a (fd, events processing logic) pair an event handler.
///
/// # Examples
///
/// ```
/// struct EpollHandler {
/// // data members.
/// }
///
/// impl IEpollHandler for EpollHandler {
/// fn fd(&self) -> RawFd {
/// // Return fd of this epoll handler.
/// }
///
/// fn dispatch(&self, events: u32) {
/// // Process events.
/// }
/// }
///
/// let handler: Arc<Handler> = Arc::default();
/// let epoll_handler = Arc::new(EpollHandler::new());
/// handler.add_epoll_handler(epoll_handler)
/// ```
pub fn add_epoll_handler(&self, handler: Arc<dyn IEpollHandler>)
-> FusionInteractionResult<Arc<dyn IEpollHandler>>
{
call_debug_enter!("Handler::add_epoll_handler");
self.scheduler.add_epoll_handler(handler)
}
/// Remove an epoll handler from epoll event loop.
pub fn remove_epoll_handler(&self, handler: Arc<dyn IEpollHandler>)
-> FusionInteractionResult<Arc<dyn IEpollHandler>>
{
call_debug_enter!("Handler::remove_epoll_handler");
self.scheduler.remove_epoll_handler(handler)
}
}
impl Default for Handler {
fn default() -> Self
{
Self::new()
}
}

View File

@ -0,0 +1,542 @@
/*
* Copyright (C) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//! Implementation of epoll event loop.
#![allow(dead_code)]
#![allow(unused_variables)]
use std::collections::BTreeMap;
use std::ffi::{ c_void, c_char, c_int, CString };
use std::future::Future;
use std::io::Error;
use std::os::fd::RawFd;
use std::pin::Pin;
use std::sync::{ Arc, Mutex };
use std::sync::atomic::{ AtomicBool, Ordering };
use std::task::{ Context, Poll, Waker };
use fusion_data_rust::{ FusionErrorCode, FusionInteractionResult };
use fusion_utils_rust::{ call_debug_enter };
use hilog_rust::{ debug, info, error, hilog, HiLogLabel, LogType };
/// Indicating data other than high-priority data can be read.
pub const LIBC_EPOLLIN: u32 = libc::EPOLLIN as u32;
const LIBC_EPOLLONESHOT: u32 = libc::EPOLLONESHOT as u32;
/// Indicating an error has occurred.
pub const LIBC_EPOLLERR: u32 = libc::EPOLLERR as u32;
/// Indicating a hangup has occurred.
pub const LIBC_EPOLLHUP: u32 = libc::EPOLLHUP as u32;
const LIBC_EPOLLALL: u32 = LIBC_EPOLLIN | LIBC_EPOLLERR | LIBC_EPOLLHUP;
const LIBC_EPOLLNONE: u32 = 0;
const MAX_EPOLL_EVENTS: c_int = 128;
const EPOLL_SUCCESS: c_int = 0;
const EPOLL_FAILURE: c_int = -1;
const NO_TIMEOUT: c_int = -1;
const SYSTEM_IO_FAILURE: libc::ssize_t = -1;
const INVALID_FD: RawFd = -1;
const LOG_LABEL: HiLogLabel = HiLogLabel {
log_type: LogType::LogCore,
domain: 0xD002220,
tag: "Scheduler",
};
/// Abstraction of epoll handler.
pub trait IEpollHandler: Send + Sync {
/// Return file descriptor of this epoll handler.
fn fd(&self) -> RawFd;
/// Dispatch epoll events to this epoll handler.
fn dispatch(&self, events: u32);
}
struct EpollEvent {
fd: RawFd,
events: u32,
}
struct EpollHandler {
raw: Arc<dyn IEpollHandler>,
handle: ylong_runtime::task::JoinHandle<()>,
waker: Option<Waker>,
events: u32,
}
impl EpollHandler {
fn new(raw: Arc<dyn IEpollHandler>, handle: ylong_runtime::task::JoinHandle<()>) -> Self
{
Self {
raw,
handle,
waker: Default::default(),
events: Default::default(),
}
}
#[inline]
fn fd(&self) -> RawFd
{
self.raw.fd()
}
#[inline]
fn raw_handler(&self) -> Arc<dyn IEpollHandler>
{
self.raw.clone()
}
#[inline]
fn set_waker(&mut self, waker: &Waker)
{
self.waker.replace(waker.clone());
}
#[inline]
fn take_events(&mut self) -> u32
{
let events = self.events;
self.events = Default::default();
events
}
}
impl Drop for EpollHandler {
fn drop(&mut self)
{
self.handle.cancel();
}
}
/// `Driver` encapsulate event loop of epoll.
struct Driver {
epoll: Arc<Epoll>,
is_running: Arc<AtomicBool>,
}
impl Driver {
fn new(epoll: Arc<Epoll>, is_running: Arc<AtomicBool>) -> Self
{
Self { epoll, is_running }
}
#[inline]
fn is_running(&self) -> bool
{
self.is_running.load(Ordering::Relaxed)
}
fn run(&self)
{
call_debug_enter!("Driver::run");
while self.is_running() {
if let Some(epoll_events) = self.epoll.epoll_wait() {
if !self.is_running() {
info!(LOG_LABEL, "Driver stopped running");
break;
}
self.epoll.wake(&epoll_events);
}
}
}
}
struct Epoll {
epoll_fd: RawFd,
handlers: Mutex<BTreeMap<RawFd, EpollHandler>>,
}
impl Epoll {
fn new() -> Self
{
// SAFETY:
// The epoll API is multi-thread safe.
// This is a normal system call, no safety pitfall.
let epoll_fd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) };
assert_ne!(epoll_fd, INVALID_FD, "epoll_create1 fail: {:?}", Error::last_os_error());
Self {
epoll_fd,
handlers: Mutex::default(),
}
}
#[inline]
fn fd(&self) -> RawFd
{
self.epoll_fd
}
fn epoll_add(&self, fd: RawFd) -> FusionInteractionResult<()>
{
call_debug_enter!("Epoll::epoll_add");
let mut ev = libc::epoll_event {
events: LIBC_EPOLLIN | LIBC_EPOLLONESHOT | LIBC_EPOLLHUP | LIBC_EPOLLERR,
u64: fd as u64,
};
// SAFETY:
// The epoll API is multi-thread safe.
// We have carefully ensure that parameters are as required by system interface.
let ret = unsafe {
libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_ADD, fd, &mut ev)
};
if ret != EPOLL_SUCCESS {
error!(LOG_LABEL, "epoll_ctl_add({},{}) fail: {:?}",
@public(self.epoll_fd), @public(fd), @public(Error::last_os_error()));
Err(FusionErrorCode::Fail)
} else {
Ok(())
}
}
fn epoll_del(&self, fd: RawFd) -> FusionInteractionResult<()>
{
call_debug_enter!("Epoll::epoll_del");
// SAFETY:
// The epoll API is multi-thread safe.
// We have carefully ensure that parameters are as required by system interface.
let ret = unsafe {
libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut())
};
if ret != EPOLL_SUCCESS {
error!(LOG_LABEL, "epoll_ctl_remove({},{}) fail: {:?}",
@public(self.epoll_fd), @public(fd), @public(Error::last_os_error()));
Err(FusionErrorCode::Fail)
} else {
Ok(())
}
}
fn epoll_wait(&self) -> Option<Vec<EpollEvent>>
{
call_debug_enter!("Epoll::epoll_wait");
let mut events: Vec<libc::epoll_event> = Vec::with_capacity(MAX_EPOLL_EVENTS as usize);
// SAFETY:
// The epoll API is multi-thread safe.
// We have carefully ensure that parameters are as required by system interface.
let ret = unsafe {
libc::epoll_wait(self.epoll_fd, events.as_mut_ptr(), MAX_EPOLL_EVENTS, NO_TIMEOUT)
};
if ret < 0 {
error!(LOG_LABEL, "epoll_wait({}) fail: {:?}",
@public(self.epoll_fd),
@public(Error::last_os_error()));
return None;
}
let num_of_events = ret as usize;
// SAFETY:
// `epoll_wait` returns the number of events and promise it is within the limit of
// `MAX_EPOLL_EVENTS`.
let epoll_events = unsafe {
std::slice::from_raw_parts(events.as_ptr(), num_of_events)
};
let epoll_events: Vec<EpollEvent> = epoll_events.iter().map(|e| {
EpollEvent {
fd: e.u64 as RawFd,
events: e.events,
}
}).collect();
Some(epoll_events)
}
fn epoll_reset(&self, fd: RawFd) -> FusionInteractionResult<()>
{
call_debug_enter!("Epoll::epoll_reset");
let mut ev = libc::epoll_event {
events: LIBC_EPOLLIN | LIBC_EPOLLONESHOT | LIBC_EPOLLHUP | LIBC_EPOLLERR,
u64: fd as u64,
};
// SAFETY:
// The epoll API is multi-thread safe.
// We have carefully ensure that parameters are as required by system interface.
let ret = unsafe {
libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_MOD, fd, &mut ev)
};
if ret != EPOLL_SUCCESS {
error!(LOG_LABEL, "in reset_fd, epoll_ctl_mod({},{}) fail: {:?}",
@public(self.epoll_fd), @public(fd), @public(Error::last_os_error()));
Err(FusionErrorCode::Fail)
} else {
Ok(())
}
}
fn add_epoll_handler(&self, fd: RawFd, epoll_handler: EpollHandler)
-> FusionInteractionResult<Arc<dyn IEpollHandler>>
{
call_debug_enter!("Epoll::add_epoll_handler");
let mut guard = self.handlers.lock().unwrap();
if guard.contains_key(&fd) {
error!(LOG_LABEL, "Epoll handler ({}) has been added", @public(fd));
return Err(FusionErrorCode::Fail);
}
debug!(LOG_LABEL, "Add epoll handler ({})", @public(fd));
let raw = epoll_handler.raw_handler();
guard.insert(fd, epoll_handler);
let _ = self.epoll_add(fd);
Ok(raw)
}
fn remove_epoll_handler(&self, fd: RawFd) -> FusionInteractionResult<Arc<dyn IEpollHandler>>
{
call_debug_enter!("Epoll::remove_epoll_handler");
let mut guard = self.handlers.lock().unwrap();
let _ = self.epoll_del(fd);
if let Some(h) = guard.remove(&fd) {
debug!(LOG_LABEL, "Remove epoll handler ({})", @public(fd));
Ok(h.raw_handler())
} else {
error!(LOG_LABEL, "No epoll handler ({})", @public(fd));
Err(FusionErrorCode::Fail)
}
}
fn wake(&self, events: &[EpollEvent])
{
call_debug_enter!("Epoll::wake");
let mut guard = self.handlers.lock().unwrap();
for e in events {
if let Some(handler) = guard.get_mut(&e.fd) {
debug!(LOG_LABEL, "Wake epoll handler ({})", @public(e.fd));
handler.events = e.events;
if let Some(waker) = &handler.waker {
waker.wake_by_ref();
}
} else {
error!(LOG_LABEL, "No epoll handler ({})", @public(e.fd));
}
}
}
fn dispatch_inner(&self, fd: RawFd, waker: &Waker) -> Option<(Arc<dyn IEpollHandler>, u32)>
{
call_debug_enter!("Epoll::dispatch_inner");
let mut guard = self.handlers.lock().unwrap();
if let Some(handler) = guard.get_mut(&fd) {
handler.set_waker(waker);
let events = handler.take_events() & LIBC_EPOLLALL;
if events != LIBC_EPOLLNONE {
Some((handler.raw_handler(), events))
} else {
debug!(LOG_LABEL, "No epoll event");
None
}
} else {
error!(LOG_LABEL, "No epoll handler with ({})", @public(fd));
None
}
}
fn dispatch(&self, fd: RawFd, waker: &Waker)
{
call_debug_enter!("Epoll::dispatch");
if let Some((handler, events)) = self.dispatch_inner(fd, waker) {
handler.dispatch(events);
let _ = self.epoll_reset(fd);
}
}
}
impl Default for Epoll {
fn default() -> Self
{
Self::new()
}
}
impl Drop for Epoll {
fn drop(&mut self)
{
// SAFETY:
// Parameter is as required by system, so consider it safe here.
let ret = unsafe { libc::close(self.epoll_fd) };
if ret != 0 {
error!(LOG_LABEL, "close({}) fail: {:?}",
@public(self.epoll_fd),
@public(Error::last_os_error()));
}
}
}
struct EpollHandlerFuture {
fd: RawFd,
epoll: Arc<Epoll>,
}
impl EpollHandlerFuture {
fn new(fd: RawFd, epoll: Arc<Epoll>) -> Self
{
Self { fd, epoll }
}
}
impl Future for EpollHandlerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>
{
call_debug_enter!("EpollHandlerFuture::poll");
self.epoll.dispatch(self.fd, cx.waker());
Poll::Pending
}
}
struct EpollWaker {
fds: [RawFd; 2],
}
impl EpollWaker {
fn new() -> Self
{
let mut fds: [c_int; 2] = [-1; 2];
// SAFETY:
// The pipe API is multi-thread safe.
// We have carefully checked that parameters are as required by system interface.
let ret = unsafe {
libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK)
};
if ret != 0 {
error!(LOG_LABEL, "pipe2 fail: {:?}", @public(Error::last_os_error()));
}
Self { fds }
}
fn wake(&self)
{
call_debug_enter!("EpollWaker::wake");
let data: i32 = 0;
// SAFETY:
// We have carefully checked that parameters are as required by system interface.
let ret = unsafe {
libc::write(self.fds[1],
std::ptr::addr_of!(data) as *const c_void,
std::mem::size_of_val(&data))
};
if ret == SYSTEM_IO_FAILURE {
error!(LOG_LABEL, "write fail: {:?}", @public(Error::last_os_error()));
}
}
}
impl IEpollHandler for EpollWaker {
fn fd(&self) -> RawFd
{
self.fds[0]
}
fn dispatch(&self, events: u32)
{
if (events & LIBC_EPOLLIN) == LIBC_EPOLLIN {
let data: i32 = 0;
// SAFETY:
// Parameters are as required by system and business logic, so it can be trusted.
let ret = unsafe {
libc::read(self.fd(),
std::ptr::addr_of!(data) as *mut c_void,
std::mem::size_of_val(&data))
};
if ret == SYSTEM_IO_FAILURE {
error!(LOG_LABEL, "read fail: {:?}", @public(Error::last_os_error()));
}
}
}
}
impl Default for EpollWaker {
fn default() -> Self
{
Self::new()
}
}
impl Drop for EpollWaker {
fn drop(&mut self)
{
for fd in &mut self.fds {
if *fd != INVALID_FD {
// SAFETY:
// Parameter is as required by system, so consider it safe here.
let ret = unsafe { libc::close(*fd) };
if ret != EPOLL_SUCCESS {
error!(LOG_LABEL, "close({}) fail: {:?}",
@public(*fd),
@public(Error::last_os_error()));
}
}
}
}
}
/// Bookkeeping of epoll handling.
pub struct Scheduler {
epoll: Arc<Epoll>,
epoll_waker: Arc<EpollWaker>,
is_running: Arc<AtomicBool>,
join_handle: Option<std::thread::JoinHandle<()>>,
}
impl Scheduler {
pub(crate) fn new() -> Self
{
call_debug_enter!("Scheduler::new");
let epoll: Arc<Epoll> = Arc::default();
let is_running = Arc::new(AtomicBool::new(true));
let driver = Driver::new(epoll.clone(), is_running.clone());
let join_handle = std::thread::spawn(move || {
driver.run();
});
let scheduler = Self {
epoll,
epoll_waker: Arc::default(),
is_running,
join_handle: Some(join_handle),
};
let _ = scheduler.add_epoll_handler(scheduler.epoll_waker.clone());
scheduler
}
pub(crate) fn add_epoll_handler(&self, handler: Arc<dyn IEpollHandler>)
-> FusionInteractionResult<Arc<dyn IEpollHandler>>
{
call_debug_enter!("Scheduler::add_epoll_handler");
let fd: RawFd = handler.fd();
let join_handle = ylong_runtime::spawn(
EpollHandlerFuture::new(fd, self.epoll.clone())
);
self.epoll.add_epoll_handler(fd, EpollHandler::new(handler, join_handle))
}
pub(crate) fn remove_epoll_handler(&self, handler: Arc<dyn IEpollHandler>)
-> FusionInteractionResult<Arc<dyn IEpollHandler>>
{
call_debug_enter!("Scheduler::remove_epoll_handler");
self.epoll.remove_epoll_handler(handler.fd())
}
}
impl Default for Scheduler {
fn default() -> Self
{
Self::new()
}
}
impl Drop for Scheduler {
fn drop(&mut self)
{
call_debug_enter!("Scheduler::drop");
self.is_running.store(false, Ordering::Relaxed);
self.epoll_waker.wake();
if let Some(join_handle) = self.join_handle.take() {
let _ = join_handle.join();
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright (C) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//! Implementation of task handle.
use fusion_data_rust::{ FusionErrorCode, FusionInteractionResult };
use ylong_runtime::task::JoinHandle;
/// Representation of task handle.
pub struct TaskHandle<R> {
join_handle: Option<JoinHandle<R>>,
}
impl<R> From<JoinHandle<R>> for TaskHandle<R> {
fn from(value: JoinHandle<R>) -> Self
{
Self {
join_handle: Some(value),
}
}
}
impl<R> TaskHandle<R> {
/// Cancel this task.
pub fn cancel(&mut self)
{
if let Some(join_handle) = self.join_handle.take() {
join_handle.cancel();
}
}
/// Get result of this task.
pub fn result(&mut self) -> FusionInteractionResult<R>
{
if let Some(join_handle) = self.join_handle.take() {
if let Ok(ret) = ylong_runtime::block_on(join_handle) {
Ok(ret)
} else {
Err(FusionErrorCode::Fail)
}
} else {
Err(FusionErrorCode::Fail)
}
}
}

View File

@ -0,0 +1,30 @@
# Copyright (C) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import("//base/msdp/device_status/device_status.gni")
import("//build/test.gni")
ohos_rust_unittest("fusion_scheduler_test") {
module_out_path = "${device_status_part_name}/unit_out"
sources = [ "src/lib.rs" ]
deps = [
"${device_status_root_path}/rust/modules/scheduler/sys:fusion_scheduler_rust",
"${device_status_root_path}/rust/utils:fusion_utils_rust",
]
external_deps = [ "hilog:hilog_rust" ]
subsystem_name = "${device_status_subsystem_name}"
part_name = "${device_status_part_name}"
}

View File

@ -0,0 +1,259 @@
/*
* Copyright (C) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//! TODO: add documentation.
#![feature(rustc_private)]
extern crate libc;
use std::ffi::{ c_void, c_char, c_int, CString };
use std::io::Error;
use std::os::fd::RawFd;
use std::sync::{ Arc, Condvar, Mutex };
use std::sync::atomic::{ AtomicI32, Ordering };
use std::time::Duration;
use hilog_rust::{ debug, info, error, hilog, HiLogLabel, LogType };
use fusion_scheduler_rust::{ Handler, IEpollHandler, LIBC_EPOLLIN };
use fusion_utils_rust::{ call_debug_enter };
const LOG_LABEL: HiLogLabel = HiLogLabel {
log_type: LogType::LogCore,
domain: 0xD002220,
tag: "FusionSchedulerTest",
};
struct EpollHandlerImpl {
fds: [RawFd; 2],
data: i32,
}
impl EpollHandlerImpl {
fn signal(&self, data: i32)
{
error!(LOG_LABEL, "EpollHandlerImpl::signal once");
let ret = unsafe {
libc::write(self.fds[1], std::ptr::addr_of!(data) as *const c_void, std::mem::size_of_val(&data))
};
if ret == -1 {
error!(LOG_LABEL, "libc::write fail");
}
}
fn fd(&self) -> RawFd
{
self.fds[0]
}
fn dispatch(&mut self, events: u32)
{
call_debug_enter!("EpollHandlerImpl::dispatch");
if (events & LIBC_EPOLLIN) == LIBC_EPOLLIN {
let data: i32 = 0;
let ret = unsafe {
libc::read(self.fds[0], std::ptr::addr_of!(data) as *mut c_void, std::mem::size_of_val(&data))
};
if ret == -1 {
error!(LOG_LABEL, "libc::read fail");
}
info!(LOG_LABEL, "EpollHandlerImpl::dispatch({}), data:{}", @public(self.fds[0]), @public(data));
self.data = data;
}
}
fn data(&self) -> i32
{
self.data
}
}
impl Drop for EpollHandlerImpl {
fn drop(&mut self)
{
for fd in &mut self.fds {
if *fd != -1 {
unsafe { libc::close(*fd) };
*fd = -1;
}
}
}
}
struct EpollHandler {
inner: Mutex<EpollHandlerImpl>,
var: Condvar,
}
impl EpollHandler {
fn new() -> Self
{
let mut fds: [c_int; 2] = [-1; 2];
let ret = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) };
if ret != 0 {
error!(LOG_LABEL, "in EpollHandler::new, libc::pipe2 fail:{:?}", @public(Error::last_os_error()));
}
debug!(LOG_LABEL, "in EpollHandler::new, fds:({},{})", @public(fds[0]), @public(fds[1]));
Self {
inner: Mutex::new(EpollHandlerImpl {
fds,
data: -1,
}),
var: Condvar::new(),
}
}
fn signal(&self, data: i32)
{
let guard = self.inner.lock().unwrap();
guard.signal(data);
}
fn data(&self) -> i32
{
let guard = self.inner.lock().unwrap();
guard.data()
}
fn wait(&self, dur: Duration) -> bool
{
call_debug_enter!("EpollHandler::wait");
let guard = self.inner.lock().unwrap();
let (_, ret) = self.var.wait_timeout(guard, dur).unwrap();
if ret.timed_out() {
info!(LOG_LABEL, "in EpollHandler::wait, timeout");
false
} else {
true
}
}
}
impl IEpollHandler for EpollHandler {
fn fd(&self) -> RawFd
{
let guard = self.inner.lock().unwrap();
guard.fd()
}
fn dispatch(&self, events: u32)
{
call_debug_enter!("EpollHandler::dispatch");
let mut guard = self.inner.lock().unwrap();
guard.dispatch(events);
self.var.notify_one();
}
}
#[test]
fn test_add_epoll_handler()
{
let handler: Arc<Handler> = Arc::default();
let epoll = Arc::new(EpollHandler::new());
assert!(handler.add_epoll_handler(epoll.clone()).is_ok());
let data: i32 = 13574;
epoll.signal(data);
assert!(epoll.wait(Duration::from_millis(100)));
info!(LOG_LABEL, "in test_add_epoll_handler, data:{}", @public(epoll.data()));
assert_eq!(epoll.data(), data);
assert!(handler.remove_epoll_handler(epoll).is_ok());
}
fn hash(param: usize) -> usize
{
const HASHER: usize = 0xAAAAAAAA;
HASHER ^ param
}
#[test]
fn test_post_sync_task()
{
let handler: Arc<Handler> = Arc::default();
let param: usize = 0xAB1807;
let ret = handler.post_sync_task(move || {
hash(param)
});
let expected = hash(param);
assert_eq!(ret, expected);
}
#[test]
fn test_post_async_task()
{
let handler: Arc<Handler> = Arc::default();
let param: usize = 0xAB1807;
let mut task_handle = handler.post_async_task(move || {
hash(param)
});
let ret = task_handle.result().unwrap();
let expected = hash(param);
assert_eq!(ret, expected);
}
#[test]
fn test_post_perioric_task()
{
let handler: Arc<Handler> = Arc::default();
let epoll = Arc::new(EpollHandler::new());
let cloned_epoll = epoll.clone();
assert!(handler.add_epoll_handler(epoll.clone()).is_ok());
let _ = handler.post_perioric_task(move || {
static ID_RADIX: AtomicI32 = AtomicI32::new(1);
cloned_epoll.signal(ID_RADIX.fetch_add(1, Ordering::Relaxed));
}, None, Duration::from_millis(100), Some(10));
std::thread::sleep(Duration::from_secs(1));
info!(LOG_LABEL, "in test_post_perioric_task, data:{}", @public(epoll.data()));
assert!(epoll.data() >= 10);
assert!(handler.remove_epoll_handler(epoll).is_ok());
}
#[test]
fn test_post_delayed_task()
{
let handler: Arc<Handler> = Arc::default();
let epoll = Arc::new(EpollHandler::new());
assert!(handler.add_epoll_handler(epoll.clone()).is_ok());
let data: i32 = 13547;
let cloned_epoll = epoll.clone();
let _ = handler.post_delayed_task(move || {
cloned_epoll.signal(data);
}, Duration::from_millis(10));
assert!(epoll.wait(Duration::from_millis(100)));
info!(LOG_LABEL, "in test_post_delayed_task, data:{}", @public(epoll.data()));
assert_eq!(epoll.data(), data);
assert!(handler.remove_epoll_handler(epoll).is_ok());
}
#[test]
fn test_post_blocking_task()
{
let handler: Arc<Handler> = Arc::default();
let param: usize = 0xAB1807;
let mut task_handle = handler.post_blocking_task(move || {
std::thread::sleep(Duration::from_millis(100));
hash(param)
});
let ret = task_handle.result().unwrap();
let expected = hash(param);
assert_eq!(ret, expected);
}

View File

@ -19,6 +19,7 @@ ohos_rust_shared_library("fusion_services_rust") {
deps = [
"${device_status_root_path}/rust/data/sys:fusion_data_rust",
"${device_status_root_path}/rust/modules/scheduler/sys:fusion_scheduler_rust",
"${device_status_root_path}/rust/services/binding:fusion_services_binding",
"${device_status_root_path}/rust/utils:fusion_utils_rust",
]