!11 删除ylong_runtime的tokio封装

Merge pull request !11 from fqwert/ylong_runtime
This commit is contained in:
openharmony_ci 2023-08-25 05:29:38 +00:00 committed by Gitee
commit 8c5a67d345
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
37 changed files with 0 additions and 2470 deletions

View File

@ -1,35 +0,0 @@
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import("//build/ohos.gni")
ohos_rust_shared_library("lib") {
crate_name = "ylong_runtime"
crate_type = "dylib"
crate_root = "src/lib.rs"
part_name = "ylong_runtime"
subsystem_name = "thirdparty"
sources = [ "src/lib.rs" ]
deps = [ "//third_party/rust/crates/tokio/tokio:lib" ]
features = [
"full",
"multi_thread_runtime",
"current_thread_runtime",
"fs",
"sync",
"timer",
"net",
]
}

View File

@ -1,34 +0,0 @@
[package]
name = "ylong_runtime"
version = "0.1.0"
authors = ["Chen Mingyu <chenmingyu4@huawei.com>"]
edition = "2018"
description = "Async Runtime"
license = "MIT or Apache-2.0"
keywords = ["tokio", "runtime", "async"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "=1.20.1", features = ["rt", "rt-multi-thread", "io-util"] }
[features]
full = [
"current_thread_runtime",
"fs",
"sync",
"timer",
"net",
]
multi_thread_runtime = []
current_thread_runtime = []
fs = ["tokio/fs"]
sync = ["tokio/sync"]
timer = ["tokio/time"]
net = ["tokio/net"]

View File

@ -1,24 +0,0 @@
# ylong_runtime
A tokio-runtime wrapper crate, provides basic runtime functionalities:
1. async net io
2. async file io
3. sync
4. timer
## Overview
This crate wraps tokio's apis. The only difference from tokio is that the runtime instance is a global
singleton object. The first call to ``ylong_ruintime::spawn`` or ``ylong_runtime::spawn_blocking`` will
initialize the global runtime. Users could also configure the runtime using ``RuntimeBuilder`` before
spawning.
## Background
This crate's purpose is to uniform runtime's apis, so users could ignore the underlying scheduler.
## How to use
Please add the following dependency in your ``BUILD.gn``
```json
deps += ["//third_party/rust/crate/ylong_runtime"]
```
[tokio readme](../README.md)

View File

@ -1,20 +0,0 @@
# ylong_runtime
该库是一个``tokio``的封装库,提供了以下运行时组件:
1. 异步网络io
2. 异步文件io
3. 同步原语
4. 定时器
## ylong_runtime简介
ylong_runtime库封装了tokio的基础接口并在上层提供了一个全局单例的运行时实例。用户在调用
``ylong_runtime::spawn``或是``ylong_runtime::spawn_blocking``时ylong_runtime
将会自动启动一个默认配置的运行时。用户也可以在启动运行时前,通过``RuntimeBuilder``进行配置。
## 如何使用
在您的 BUILD.gn 需要的地方添加依赖即可。
```json
deps += ["//third_party/rust/crate/ylong_runtime"]
```
[tokio的原生README](../README.md)

View File

@ -1,37 +0,0 @@
{
"name": "@ohos/ylong_runtime",
"description": "Ylong runtime implementation",
"version": "4.0",
"license": "MIT",
"publishAs": "code-segment",
"segment": {
"destPath": "third_party/rust/crates/tokio/ylong_runtime"
},
"dirs": {},
"scripts": {},
"licensePath": "COPYING",
"readmePath": {
"en": "README.md",
"zh": "README_zh.md"
},
"component": {
"name": "ylong_runtime",
"subsystem": "thirdparty",
"syscap": [],
"features": [],
"adapted_system_type": [],
"rom": "1500KB",
"ram": "5000KB",
"deps": {
"components": [],
"third_party": []
},
"build": {
"sub_component": [
"//third_party/rust/crates/tokio/ylong_runtime:lib"
],
"inner_kits": [],
"test": []
}
}
}

View File

@ -1,73 +0,0 @@
use std::time::Duration;
pub(crate) struct CommonBuilder {
pub(crate) thread_name: Option<String>,
pub(crate) keep_alive_time: Option<Duration>,
pub(crate) stack_size: Option<usize>,
}
impl CommonBuilder {
pub(crate) fn new() -> Self {
Self {
thread_name: None,
keep_alive_time: None,
stack_size: None,
}
}
}
macro_rules! impl_common {
($self:ident) => {
use std::time::Duration;
use tokio::runtime::Builder;
impl $self {
/// Sets the name prefix for all worker threads
pub fn thread_name(mut self, name: String) -> Self {
self.common.thread_name = Some(name);
self
}
/// Sets the stack size for every worker thread that gets spawned by the runtime
/// The minimum stack size is 1.
pub fn thread_stack_size(mut self, stack_size: usize) -> Self {
if stack_size < 1 {
self.common.stack_size = Some(1);
} else {
self.common.stack_size = Some(stack_size);
}
self
}
/// Sets how long will the thread be kept alvie inside the blocking pool after
/// it becomes idle.
pub fn keep_alive_time(mut self, keep_alive_time: Duration) -> Self {
self.common.keep_alive_time = Some(keep_alive_time);
self
}
pub(crate) fn build_common(&self, mut builder: Builder) -> Builder {
if let Some(stack_size) = self.common.stack_size {
builder.thread_stack_size(stack_size);
}
if let Some(keep_alive) = self.common.keep_alive_time {
builder.thread_keep_alive(keep_alive);
}
if let Some(name) = &self.common.thread_name {
builder.thread_name(name);
}
#[cfg(any(feature = "net", feature = "timer"))]
builder.enable_all();
builder
}
}
};
}
pub(crate) use impl_common;

View File

@ -1,23 +0,0 @@
use crate::builder::common_builder::{impl_common, CommonBuilder};
use std::io;
pub struct CurrentThreadBuilder {
pub(crate) common: CommonBuilder,
}
impl CurrentThreadBuilder {
pub(crate) fn new() -> Self {
CurrentThreadBuilder {
common: CommonBuilder::new(),
}
}
/// Initializes the runtime and returns its instance.
pub fn build(&mut self) -> io::Result<crate::executor::Runtime> {
let mut runtime = self.build_common(Builder::new_current_thread());
Ok(crate::executor::Runtime(runtime.build()?))
}
}
impl_common!(CurrentThreadBuilder);

View File

@ -1,55 +0,0 @@
#[cfg(feature = "current_thread_runtime")]
use crate::builder::current_thread_builder::CurrentThreadBuilder;
use crate::builder::multi_thread_builder::MultiThreadBuilder;
pub(crate) mod common_builder;
#[cfg(feature = "current_thread_runtime")]
pub mod current_thread_builder;
pub mod multi_thread_builder;
/// Builder to build the runtime. Provides methods to customize the runtime, such
/// as setting thread pool size, worker thread stack size, work thread name prefix and etc.
///
/// if `multi_thread_runtime` or `current_thread_runtime` features is turned on:
/// After setting the RuntimeBuilder, a call to `build` will initialize the runtime
/// and returns its instance. If there is an invalid parameter during the build, an
/// error would be returned.
///
/// Otherwise:
/// RuntimeBuilder wiull not have the `build()` method, instead, this builder should
/// be passed to set the global executor (not implemented yet)
///
/// # Examples
///
/// ```no run
/// #![cfg(feature = "multi_thread_runtime")]
///
/// use ylong_runtime::builder::RuntimeBuilder;
/// use ylong_runtime::executor::Runtime;
///
/// let runtime = RuntimeBUilder::new_multi_thread()
/// .thread_number(4)
/// .thread_stack_size(1024 * 30)
/// .build()
/// .unwrap();
/// ```
pub struct RuntimeBuilder;
impl RuntimeBuilder {
/// Initializes a new RuntimeBuilder with current_thread settings.
///
/// All tasks will run on the current thread, which means it does not create any other
/// worker threads.
#[cfg(feature = "current_thread_runtime")]
pub fn new_current_thread() -> CurrentThreadBuilder {
CurrentThreadBuilder::new()
}
/// Initializes a new RuntimeBuilder with multi_thread settings.
///
/// When running, worker threads will be created according to the builder configuration,
/// and tasks will be allocated and run in the newly created worker thread pool.
pub fn new_multi_thread() -> MultiThreadBuilder {
MultiThreadBuilder::new()
}
}

View File

@ -1,59 +0,0 @@
use crate::builder::common_builder::{impl_common, CommonBuilder};
use tokio::io;
pub struct MultiThreadBuilder {
pub(crate) common: CommonBuilder,
pub(crate) thread_num: Option<u8>,
}
impl MultiThreadBuilder {
pub(crate) fn new() -> Self {
MultiThreadBuilder {
common: CommonBuilder::new(),
thread_num: None,
}
}
/// Initializes the runtime and returns its instance.
#[cfg(feature = "multi_thread_runtime")]
pub fn build(&mut self) -> io::Result<crate::executor::Runtime> {
self.build_inner()
}
pub(crate) fn build_inner(&mut self) -> io::Result<crate::executor::Runtime> {
let mut runtime = self.build_common(Builder::new_multi_thread());
if let Some(thread_num) = self.thread_num {
runtime.worker_threads(thread_num as usize);
};
Ok(crate::executor::Runtime(runtime.build()?))
}
/// Sets the number of core worker threads.
///
/// The boundary of the thread number is 1-64:
/// If sets a number smaller than 1, then thread number would be set to 1.
/// If sets a number larger than 64, then thread number would be set to 64.
/// The default number of the core threads is the number of cores of the cpu.
///
/// # Examples
///
/// ```
/// use ylong_runtime::builder::RuntimeBuilder;
///
/// let runtime = RuntimeBuilder::new_multi_thread()
/// .thread_number(8);
/// ```
pub fn thread_number(mut self, thread_num: u8) -> Self {
if thread_num < 1 {
self.thread_num = Some(1);
} else if thread_num > 64 {
self.thread_num = Some(64);
} else {
self.thread_num = Some(thread_num)
}
self
}
}
impl_common!(MultiThreadBuilder);

View File

@ -1,92 +0,0 @@
use crate::builder::RuntimeBuilder;
use crate::join_handle::JoinHandle;
use std::future::Future;
use std::mem::MaybeUninit;
use std::sync::Once;
pub struct Runtime(pub(crate) tokio::runtime::Runtime);
impl Runtime {
/// Spawns a future onto the runtime, returning a [`JoinHandle`] for it.
///
/// The `future` will be later polled by the executor, which is usually implemented as a thread
/// pool. The executor will run the future until finished.
///
/// Awaits on the JoinHandle will return the result of the future, but users don't have to
/// explicitly `.await` the `JoinHandle` in order to run the future, since the future will
/// be executed in the background once it's spawned. Dropping the JoinHandle will throw away
/// the returned value.
///
/// # Examples
///
/// ```no run
/// use ylong_runtime::builder::RuntimeBuilder;
/// let runtime = RuntimeBuilder::new_current_thread().build().unwrap();
/// let handle = runtime.spawn(async move {
/// 1
/// });
/// assert_eq!(runtime.block_on(handle).unwrap(), 1);
/// ```
pub fn spawn<T, R>(&self, task: T) -> JoinHandle<R>
where
T: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let handle = self.0.spawn(task);
JoinHandle(handle)
}
/// Spawns the provided function or closure onto the runtime.
///
/// It's usually used for cpu-bounded computation that does not return pending and takes
/// a relatively long time.
///
/// # Examples
///
/// ```no run
/// use ylong_runtime::builder::RuntimeBuilder;
/// let runtime = RuntimeBuilder::new_current_thread().build().unwrap();
/// let handle = runtime.spawn_blocking(move || {
/// 10
/// });
/// assert_eq!(runtime.block_on(handle).unwrap(), 10);
/// ```
pub fn spawn_blocking<T, R>(&self, task: T) -> JoinHandle<R>
where
T: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let handle = self.0.spawn_blocking(task);
JoinHandle(handle)
}
/// Blocks the current thread and runs the given future (usually a JoinHandle) to completion,
/// and gets its return value.
///
/// Any code after the `block_on` will be executed once the future is done.
///
/// Don't use this method on an asynchronous environment, since it will block the worker
/// thread and may cause deadlock.
pub fn block_on<T, R>(&self, task: T) -> R
where
T: Future<Output = R>,
{
self.0.block_on(task)
}
}
pub(crate) fn get_global_runtime() -> &'static Runtime {
static mut GLOBAL_DEFAULT_RT: MaybeUninit<Runtime> = MaybeUninit::uninit();
static ONCE: Once = Once::new();
unsafe {
ONCE.call_once(|| {
let rt = match RuntimeBuilder::new_multi_thread().build_inner() {
Ok(rt) => rt,
Err(e) => panic!("initializing global runtime failed: {:?}", e),
};
GLOBAL_DEFAULT_RT = MaybeUninit::new(rt);
});
&*GLOBAL_DEFAULT_RT.as_ptr()
}
}

View File

@ -1,206 +0,0 @@
use std::fs::{Metadata, Permissions};
use std::io;
use std::io::{Error, SeekFrom};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::File as TokioFile;
use tokio::io::ReadBuf;
use crate::io::{AsyncRead, AsyncSeek, AsyncWrite};
/// An asynchronous wrapping of [`std::fs::File`]. Provides async read/write methods.
pub struct File(pub(crate) TokioFile);
impl File {
pub fn new(file: std::fs::File) -> File {
File(TokioFile::from_std(file))
}
/// Attempts to open a file in read-only mode asynchronously.
///
/// # Errors
/// This function will return an error if `path` does not already exist
///
/// # Examples
///
/// ```no_run
/// use ylong_runtime::fs::async_file::File;
/// async fn open() -> std::io::Result<()> {
/// let mut f = File::open("foo.txt").await;
/// Ok(())
/// }
/// ```
pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<File> {
Ok(File(TokioFile::open(path).await?))
}
/// Opens a file in write-only mode asynchronously.
///
/// This function will create a file if it does not exist
/// and truncate it if it does.
///
/// # Examples
/// ```no_run
/// use ylong_runtime::fs::async_file::File;
/// async fn create() -> std::io::Result<()> {
/// let mut f = File::create("foo.txt").await?;
/// Ok(())
/// }
/// ```
pub async fn create<P: AsRef<Path>>(path: P) -> io::Result<File> {
Ok(File(TokioFile::create(path).await?))
}
/// Changes the permissions on the underlying file asynchronously.
///
/// # Errors
/// This function will return an error if the user lacks permission change
/// attributes on the underlying file. It may also return an error in other
/// os-specific unspecified cases.
///
/// # Examples
/// ```no_run
/// use ylong_runtime::fs::File;
///
/// async fn set_permissions() -> std::io::Result<()> {
/// let file = File::open("foo.txt").await?;
/// let mut perms = file.metadata().await?.permissions();
/// perms.set_readonly(true);
/// file.set_permissions(perms).await?;
/// Ok(())
/// }
///
/// ```
///
/// Note that this method alters the permissions of the underlying file,
/// even though it takes ``&self` rahter than `&mut self`.
pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
self.0.set_permissions(perm).await
}
/// Attempts to sync all OS-internal metadata to disk asynchronously.
///
/// This function will attempt to ensure that all in-memory data reaches the
/// filesystem before returning.
///
/// This can be used to handle errors that would otherwise only be caught
/// when the `File` is closed. Dropping a file will ignore errors in
/// synchronizing this in-memory data.
///
/// # Examples
/// ```no_run
///
/// use ylong_runtime::io::AsyncWriteExt;
/// use ylong_runtime::fs::File;
/// async fn sync_all() -> std::io::Result<()> {
/// let mut f = File::create("foo.txt").await?;
/// f.write_all(b"Hello, world!").await?;
/// f.sync_all().await?;
/// Ok(())
/// }
/// ```
pub async fn sync_all(&self) -> io::Result<()> {
self.0.sync_all().await
}
/// This function is similar to [`File::sync_all`], except that it might not
/// synchronize file metadata to the filesystem.
///
/// This is intended for use cases that must synchronize content, but don't
/// need the metadata on disk. The goal of this method is to reduce disk
/// operations.
///
/// # Examples
///
/// ```no_run
/// use ylong_runtime::io::AsyncWriteExt;
/// use ylong_runtime::fs::File;
/// async fn sync_data() -> std::io::Result<()> {
/// let mut f = File::create("foo.txt").await?;
/// f.write_all(b"Hello, word!").await?;
/// f.sync_data().await?;
/// Ok(())
/// }
/// ```
pub async fn sync_data(&self) -> io::Result<()> {
self.0.sync_data().await
}
/// Queries metadata about hte underlying file asynchronously.
///
/// # Exmaples
/// ```no_run
/// use ylong_runtime::fs::File;
/// async fn metadata() -> std::io::Result<()> {
/// let mut f = File::open("foo.txt").await?;
/// let metadata = f.metadata().await;
/// Ok(())
/// }
/// ```
pub async fn metadata(&self) -> io::Result<Metadata> {
self.0.metadata().await
}
/// Truncates or extends the underlying file, updating the size of this file to become size.
///
/// If the size is less than the current file's size, then the file will be
/// shrunk. If it is greater than the current file's size, then the file
/// will be extended to size and have all of the intermediate data filled in
/// with 0s.
///
/// # Examples
///
/// ```no_run
/// use ylong_runtime::fs::File;
/// use ylong_runtime::io::AsyncWriteExt;
///
/// async fn set_length() -> std::io::Result<()> {
/// let mut file = File::create("foo.txt").await?;
/// file.write_all(b"Hello World!").await?;
/// file.set_len(10).await?;
/// Ok(())
/// }
/// ```
pub async fn set_len(&self, size: u64) -> io::Result<()> {
self.0.set_len(size).await
}
}
impl AsyncSeek for File {
fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
Pin::new(&mut self.get_mut().0).start_seek(position)
}
fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
Pin::new(&mut self.get_mut().0).poll_complete(cx)
}
}
impl AsyncRead for File {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
}
}
impl AsyncWrite for File {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, Error>> {
Pin::new(&mut self.get_mut().0).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Pin::new(&mut self.get_mut().0).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Pin::new(&mut self.get_mut().0).poll_shutdown(cx)
}
}

View File

@ -1,4 +0,0 @@
pub mod async_file;
pub mod open_options;
pub use async_file::File;

View File

@ -1,82 +0,0 @@
use crate::fs::async_file::File;
use std::io;
use std::path::Path;
use tokio::fs::OpenOptions as TokioOptions;
/// An asynchronous version of the [`std::fs::OpenOptions`].
///
/// Options and flags which can be used to configure how a file is opened.
#[derive(Clone, Debug)]
pub struct OpenOptions(TokioOptions);
impl OpenOptions {
/// Creates a blank new set of options ready for configuration when opening a file.
///
/// All options are initially set to `false` just like [`std::fs::OpenOptions::new`]
pub fn new() -> OpenOptions {
OpenOptions(TokioOptions::new())
}
/// Sets the option for file read access.
///
/// This option, when true, will inidicate that the file should be
/// `read`-able if opened
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
self.0.read(read);
self
}
/// Sets the option for file write access.
///
/// This option, when true, will indicate that file should be
/// `write`-able if opened.
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
self.0.write(write);
self
}
/// Sets the option for the file append mode.
///
/// This option, when true, means that writes will append to a file instead of
/// overwriting previous contents.
pub fn append(&mut self, append: bool) -> &mut OpenOptions {
self.0.append(append);
self
}
/// Sets the option for truncating a file's previous content.
///
/// If a file is successfully opened with this option set, it will truncate
/// the file to 0 length if it already exists. Any already-existed content
/// in this file will be dropped.
pub fn truncate(&mut self, truncate: bool) -> &mut Self {
self.0.truncate(truncate);
self
}
/// Sets the option to create a new file if it does not already exist, or simply
/// open it if it does exist.
///
/// In order for the file to be created, [`OpenOptions::write`] or
/// [`OpenOptions::append`] access must be set to true.
pub fn create(&mut self, create: bool) -> &mut Self {
self.0.create(create);
self
}
/// Sets the option to create a new file.
///
/// If the file alreadys exists, opening the file with the option set will cause an error.
pub fn create_new(&mut self, create_new: bool) -> &mut Self {
self.0.create_new(create_new);
self
}
/// Asynchronously opens a file at `path` with the options specified by `self`.
///
/// # Errors
/// Check std::file::OpenOptions
pub async fn open<P: AsRef<Path>>(&self, path: P) -> io::Result<File> {
Ok(File(self.0.open(path).await?))
}
}

View File

@ -1,3 +0,0 @@
pub use tokio::io::{
AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, ReadBuf,
};

View File

@ -1,44 +0,0 @@
pub mod builder;
pub mod executor;
#[cfg(feature = "fs")]
pub mod fs;
pub mod io;
#[cfg(feature = "net")]
pub mod net;
#[cfg(feature = "sync")]
pub mod sync;
pub mod task;
#[cfg(feature = "timer")]
pub mod timer;
pub use task::join_handle;
use crate::executor::get_global_runtime;
use crate::join_handle::JoinHandle;
use std::future::Future;
/// Spawns a task onto the global runtime.
pub fn spawn<T, R>(task: T) -> JoinHandle<R>
where
T: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
get_global_runtime().spawn(task)
}
/// Spawns a blocking task onto the blocking pool.
pub fn spawn_blocking<T, R>(task: T) -> JoinHandle<R>
where
T: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
get_global_runtime().spawn_blocking(task)
}
/// Blocks the current thread until the `Future` passed in is completed.
pub fn block_on<T>(task: T) -> T::Output
where
T: Future,
{
get_global_runtime().block_on(task)
}

View File

@ -1,5 +0,0 @@
pub mod tcp;
pub mod udp;
pub use tcp::TcpListener;
pub use udp::UdpSocket;

View File

@ -1,66 +0,0 @@
use crate::net::tcp::stream::TcpStream;
use std::io;
use std::net::SocketAddr;
use tokio::net::TcpListener as TokioListener;
/// An asynchronous version of [`std::net::TcpListener`]. Provides async bind/accept methods.
///
/// # Example
///
/// ```
/// use tokio::io;
/// use ylong_runtime::net::TcpListener;
///
/// async fn io_func() -> io::Result<()> {
/// let addr = "127.0.0.1:8080".parse().unwrap();
/// let server = TcpListener::bind(addr).await?;
/// let (stream, addr) = server.accept().await?;
/// Ok(())
/// }
/// ```
pub struct TcpListener(TokioListener);
impl TcpListener {
/// A TCP socket server, asynchronously listening for connections.
///
/// After creating a `TcpListener` by binding it to a socket address, it listens
/// for incoming TCP connections asynchronously. These connections can be accepted
/// by calling [`TcpListener::accept`]
///
/// # Example
/// ```
/// use std::io;
/// use ylong_runtime::net::TcpListener;
/// async fn io_func() -> io::Result<()> {
/// let addr = "127.0.0.1:8080".parse().unwrap();
/// let server = TcpListener::bind(addr).await;
/// Ok(())
/// }
/// ```
pub async fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
Ok(TcpListener(TokioListener::bind(addr).await?))
}
/// Asynchronously accepts a new incoming connection from this listener.
///
/// When connection gets established, the corresponding [`TcpStream`] and the remote
/// peer's address will be returned.
///
/// # Example
///
/// ```
/// use tokio::io;
/// use ylong_runtime::net::TcpListener;
///
/// async fn io_func() -> io::Result<()> {
/// let addr = "127.0.0.1:8080".parse().unwrap();
/// let server = TcpListener::bind(addr).await?;
/// let (stream, addr) = server.accept().await?;
/// Ok(())
/// }
/// ```
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
let (s, a) = self.0.accept().await?;
Ok((TcpStream(s), a))
}
}

View File

@ -1,5 +0,0 @@
mod listener;
mod stream;
pub use listener::TcpListener;
pub use stream::TcpStream;

View File

@ -1,49 +0,0 @@
use std::io::Error;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream as TokioStream;
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
pub struct TcpStream(pub(crate) TokioStream);
impl TcpStream {
pub async fn connect(addr: SocketAddr) -> io::Result<Self> {
Ok(TcpStream(TokioStream::connect(addr).await?))
}
pub async fn shutdown(&mut self) -> io::Result<()> {
self.0.shutdown().await
}
}
impl AsyncRead for TcpStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
}
}
impl AsyncWrite for TcpStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, Error>> {
Pin::new(&mut self.get_mut().0).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Pin::new(&mut self.get_mut().0).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Pin::new(&mut self.get_mut().0).poll_shutdown(cx)
}
}

View File

@ -1,271 +0,0 @@
use crate::io::ReadBuf;
use std::io;
use std::net::SocketAddr;
use std::task::{Context, Poll};
use tokio::net::UdpSocket as TokioUdp;
/// Asynchronous UdpSockets.
pub struct UdpSocket(TokioUdp);
/// A connected asynchronous UdpSocket.
pub struct ConnectedUdpSocket(TokioUdp);
impl UdpSocket {
/// Creates a new UDP socket and attempts to bind it to the address provided,
///
/// # Examples
/// ```
/// use std::io;
/// use ylong_runtime::net::UdpSocket;
/// async fn io_func() -> io::Result<()> {
/// let addr = "127.0.0.1:8080".parse().unwrap();
/// let mut sock = UdpSocket::bind(addr).await;
/// Ok(())
/// }
/// ```
pub async fn bind(addr: SocketAddr) -> io::Result<Self> {
Ok(UdpSocket(TokioUdp::bind(addr).await?))
}
/// Sets the default address for the UdpSocket and limits packets to those are
/// read via recv from the specific address.
///
/// Returns the connected UdpSocket if succeeds.
pub async fn connect(self, addr: SocketAddr) -> io::Result<ConnectedUdpSocket> {
self.0.connect(addr).await?;
Ok(ConnectedUdpSocket(self.0))
}
/// Returns the local address that this socket is bound to.
///
/// # Examples
///
/// ```
/// use std::io;
/// use ylong_runtime::net::UdpSocket;
/// async fn io_func() -> io::Result<()> {
/// let addr = "127.0.0.1:8080".parse().unwrap();
/// let mut sock = UdpSocket::bind(addr).await?;
/// let local_addr = sock.local_addr()?;
/// Ok(())
/// }
/// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.0.local_addr()
}
/// Sends the buffer to the given address. On success, returns the number of bytes written.
/// This will return an error when the IP version of the local socket does not
/// match the one returned from SocketAddr.
///
/// # Exampels
/// ```
/// use std::io;
/// use ylong_runtime::net::UdpSocket;
/// async fn io_func() -> io::Result<()> {
/// let local_addr = "127.0.0.1:8080".parse().unwrap();
/// let sock = UdpSocket::bind(local_addr).await?;
/// let remote_addr = "127.0.0.1:8081".parse().unwrap();
/// let len = sock.send_to(b"hello world", remote_addr).await?;
/// Ok(())
/// }
/// ```
pub async fn send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
self.0.send_to(buf, target).await
}
/// Attempts to send the buffer to a given address.
pub fn poll_send_to(
&self,
cx: &mut Context<'_>,
buf: &[u8],
target: SocketAddr,
) -> Poll<io::Result<usize>> {
self.0.poll_send_to(cx, buf, target)
}
/// Receives a single datagram message on the socket. On success, returns the number of
/// bytes read and the origin. The function must be called with valid type array buf of
/// sufficient size of hold the message bytes. If a message is too long to fit in the
/// supplied buffer, excess bytes may be discarded.
///
/// # Examples
///
/// ```
/// use std::io;
/// use ylong_runtime::net::UdpSocket;
/// async fn io_func() -> io::Result<()> {
/// let local_addr = "127.0.0.1:8080".parse().unwrap();
/// let sock = UdpSocket::bind(local_addr).await?;
/// let mut recv_buf = [0u8; 12];
/// let (len, addr) = sock.recv_from(&mut recv_buf).await?;
/// Ok(())
/// }
/// ```
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.0.recv_from(buf).await
}
/// Attempts to receive a single datagram on the socket.
pub fn poll_recv_from(
&self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<SocketAddr>> {
self.0.poll_recv_from(cx, buf)
}
/// Sets the value of the `SO_BROADCAST` option for this socket.
/// When enabled, this socket is allowed to send packets to a broadcast address.
///
/// # Examples
/// ```
/// use std::io;
/// use ylong_runtime::net::UdpSocket;
/// async fn io_func() -> io::Result<()> {
/// let local_addr = "127.0.0.1:8080".parse().unwrap();
/// let sock = UdpSocket::bind(local_addr).await?;
/// if sock.broadcast()? == false {
/// sock.set_broadcast(true)?;
/// }
/// assert_eq!(sock.broadcast()?, true);
/// Ok(())
/// }
/// ```
pub fn set_broadcast(&self, on: bool) -> io::Result<()> {
self.0.set_broadcast(on)
}
/// Gets the value of the `SO_BROADCAST` option for this socket.
///
/// # Examples
///
/// ```
/// use std::io;
/// use ylong_runtime::net::UdpSocket;
/// async fn io_func() -> io::Result<()> {
/// let local_addr = "127.0.0.1:8080".parse().unwrap();
/// let sock = UdpSocket::bind(local_addr).await?;
/// assert_eq!(sock.broadcast()?, false);
/// Ok(())
/// }
/// ```
pub fn broadcast(&self) -> io::Result<bool> {
self.0.broadcast()
}
}
impl ConnectedUdpSocket {
/// Returns the local address that this socket is bound to.
///
/// # Examples
/// ```
/// use std::io;
/// use ylong_runtime::net::UdpSocket;
/// async fn io_func() -> io::Result<()> {
/// let local_addr = "127.0.0.1:8080".parse().unwrap();
/// let sock = UdpSocket::bind(local_addr).await?;
/// let remote_addr = "127.0.0.1:8081".parse().unwrap();
/// let connected_sock = match sock.connect(remote_addr).await {
/// Ok(socket) => socket,
/// Err(e) => return Err(e),
/// };
/// let local_addr = connected_sock.local_addr()?;
/// Ok(())
/// }
/// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.0.local_addr()
}
/// Returns the socket address of the remote peer this socket was connected to.
///
/// # Examples
/// ```
/// use std::io;
/// use ylong_runtime::net::UdpSocket;
/// async fn io_func() -> io::Result<()> {
/// let local_addr = "127.0.0.1:8080".parse().unwrap();
/// let sock = UdpSocket::bind(local_addr).await?;
/// let remote_addr = "127.0.0.1:8081".parse().unwrap();
/// let connected_sock = match sock.connect(remote_addr).await {
/// Ok(socket) => socket,
/// Err(e) => return Err(e),
/// };
/// assert_eq!(connected_sock.peer_addr()?, remote_addr);
/// Ok(())
/// }
/// ```
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.0.peer_addr()
}
/// Sends data on the socket to the remote address that the socket is connected to.
/// The connect method will connect this socket to a remote address.
/// This method will fail if the socket is not connected.
///
/// # Return value
/// On success, the number of bytes sent is returned, otherwise, the encountered error
/// is returned.
///
/// # Examples
/// ```
/// use std::io;
/// use ylong_runtime::net::UdpSocket;
/// async fn io_func() -> io::Result<()> {
/// let local_addr = "127.0.0.1:8080".parse().unwrap();
/// let sock = UdpSocket::bind(local_addr).await?;
/// let remote_addr = "127.0.0.1:8081".parse().unwrap();
/// let connected_sock = match sock.connect(remote_addr).await {
/// Ok(socket) => socket,
/// Err(e) => return Err(e),
/// };
/// connected_sock.send(b"hello").await;
/// Ok(())
/// }
/// ```
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.0.send(buf).await
}
/// Attempts to send data on the socket to the remote address to which is was
/// previously connected.
pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
self.0.poll_send(cx, buf)
}
/// Receives a single datagram message on the socket from the remote address to which it is
/// connected. On success, returns the number of bytes read.
/// The function must be called with valid bytes array buf of sufficient size to hold
/// the message bytes.
/// If a message is too long to fit in the supplied buffer, excess bytes may be discarded.
/// The connect method will connect this socket to a remote address.
/// This method will fail if the socket is not connected.
///
/// # Examples
/// ```
/// use std::io;
/// use ylong_runtime::net::UdpSocket;
/// async fn io_func() -> io::Result<()> {
/// let local_addr = "127.0.0.1:8080".parse().unwrap();
/// let sock = UdpSocket::bind(local_addr).await?;
/// let remote_addr = "127.0.0.1:8081".parse().unwrap();
/// let connected_sock = match sock.connect(remote_addr).await {
/// Ok(socket) => socket,
/// Err(e) => return Err(e),
/// };
/// let mut recv_buf = [0u8; 12];
/// let n = connected_sock.recv(&mut recv_buf[..]).await?;
/// Ok(())
/// }
/// ```
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.0.recv(buf).await
}
/// Attempts to receive a single datagram message on the socket from the remote address to
/// which it is connected.
pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
self.0.poll_recv(cx, buf)
}
}

View File

@ -1,40 +0,0 @@
use std::error::Error;
use std::fmt::{Debug, Display, Formatter};
#[derive(Debug, Eq, PartialEq)]
pub enum SendError<T> {
Full(T),
Closed(T),
Timeout(T),
}
impl<T> Display for SendError<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
SendError::Full(_) => write!(f, "channel is full"),
SendError::Closed(_) => write!(f, "channel has been closed"),
SendError::Timeout(_) => write!(f, "channel sending timeout"),
}
}
}
impl<T: Debug> Error for SendError<T> {}
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum RecvError {
Empty,
Closed,
TimeOut,
}
impl Display for RecvError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
RecvError::Empty => write!(f, "channel is empty"),
RecvError::Closed => write!(f, "channel has been closed"),
RecvError::TimeOut => write!(f, "channel receiving timeout"),
}
}
}
impl Error for RecvError {}

View File

@ -1,26 +0,0 @@
use std::error::Error;
use std::fmt::{Display, Formatter};
pub mod error;
pub mod mpsc;
pub(crate) mod mutex;
pub(crate) mod notify;
pub mod oneshot;
pub(crate) mod rwlock;
pub(crate) mod semaphore;
pub use mutex::{Mutex, MutexGuard};
pub use notify::Notify;
pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
pub use semaphore::{Semaphore, SemaphoreError, SemaphorePermit};
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct LockError;
impl Display for LockError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "The attempt to get the mutex failed")
}
}
impl Error for LockError {}

View File

@ -1,276 +0,0 @@
use crate::sync::error::{RecvError, SendError};
#[cfg(feature = "timer")]
use crate::timer::timeout::timeout;
#[cfg(feature = "timer")]
use std::time::Duration;
#[cfg(feature = "timer")]
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
use tokio::sync::mpsc::{channel, Receiver, Sender};
/// The sender of the bounded channel.
/// A [`BoundedSender`] and [`BoundedReceiver`] handle pair are created by the
/// [`bounded_channel`] function.
/// There could be multiple senders for a channel.
#[derive(Clone)]
pub struct BoundedSender<T>(Sender<T>);
/// The receiver of the bounded channel.
/// There could be only one receiver for a channel.
pub struct BoundedReceiver<T>(Receiver<T>);
/// Creates a new mpsc channel, and returns the `Sender` and `Receiver` handle pair.
/// The channel is bounded with the passed in capacity.
///
/// # Examples
/// ```
/// use ylong_runtime::sync::mpsc::bounded_channel;
///
/// ylong_runtime::block_on(async move {
/// let (tx, mut rx) = bounded_channel(1);
/// let handle = ylong_runtime::spawn(async move {
/// assert_eq!(rx.recv().await, Ok(1));
/// });
/// let handle2 = ylong_runtime::spawn(async move {
/// assert!(tx.send(1).await.is_ok());
/// });
/// let _ = handle.await;
/// let _ = handle2.await;
/// });
/// ```
pub fn bounded_channel<T>(capacity: usize) -> (BoundedSender<T>, BoundedReceiver<T>) {
let (s, r) = channel(capacity);
(BoundedSender(s), BoundedReceiver(r))
}
impl<T> BoundedSender<T> {
/// Attempts to send a value to the associated [`BoundedReceiver`].
///
/// If the receiver has been closed or the channel is full, this method will
/// return an error containing the sent value
///
/// # Examples
///
/// ```
/// use ylong_runtime::sync::mpsc::bounded_channel;
/// use ylong_runtime::sync::error::RecvError;
///
/// ylong_runtime::block_on(async move {
/// let (tx, mut rx) = bounded_channel(1);
/// match rx.try_recv() {
/// Err(RecvError::Empty) => {}
/// _ => unreachable!(),
/// }
/// tx.try_send(1).unwrap();
/// match rx.try_recv() {
/// Ok(x) => assert_eq!(x, 1),
/// _ => unreachable!(),
/// }
/// });
/// ```
pub fn try_send(&self, value: T) -> Result<(), SendError<T>> {
self.0.try_send(value).map_err(|e| match e {
TrySendError::Full(x) => SendError::Full(x),
TrySendError::Closed(x) => SendError::Closed(x),
})
}
/// Sends a value to the associated receiver.
///
/// If the receiver has been closed, this method will return an error containing the
/// sent value.
///
/// # Examples
///
/// ```
/// use ylong_runtime::sync::mpsc::bounded_channel;
/// use ylong_runtime::sync::error::RecvError;
/// ylong_runtime::block_on(async move {
/// let (tx, mut rx) = bounded_channel(1);
/// let handle = ylong_runtime::spawn(async move {
/// assert_eq!(rx.recv().await, Ok(1));
/// });
/// let handle2 = ylong_runtime::spawn(async move {
/// assert!(tx.send(1).await.is_ok());
/// });
/// let _ = handle.await;
/// let _ = handle2.await;
/// });
/// ```
pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
self.0.send(value).await.map_err(|e| SendError::Closed(e.0))
}
/// Attempts to send a value to the associated receiver in a limited amount of time.
///
/// If the receiver has been closed or the time limit has been passed, this method
/// will return an error containing the sent value.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
/// use ylong_runtime::sync::mpsc::bounded_channel;
/// ylong_runtime::block_on(async move {
/// let (tx, mut rx) = bounded_channel(1);
/// let handle = ylong_runtime::spawn(async move {
/// assert_eq!(rx.recv().await, Ok(1));
/// });
/// let handle2 = ylong_runtime::spawn(async move {
/// assert!(tx.send_timeout(1, Duration::from_millis(10)).await.is_ok());
/// });
/// let _ = handle.await;
/// let _ = handle2.await;
/// });
/// ```
#[cfg(feature = "timer")]
pub async fn send_timeout(&self, value: T, time: Duration) -> Result<(), SendError<T>> {
self.0.send_timeout(value, time).await.map_err(|e| match e {
SendTimeoutError::Timeout(x) => SendError::Timeout(x),
SendTimeoutError::Closed(x) => SendError::Closed(x),
})
}
/// Checks whether the channel is closed. If so, the sender could not
/// send values anymore.
///
/// # Examples
/// ```
/// use ylong_runtime::sync::mpsc::bounded_channel;
/// let (tx, rx) = bounded_channel::<isize>(1);
/// assert!(!tx.is_closed());
/// drop(rx);
/// assert!(tx.is_closed());
/// ```
pub fn is_closed(&self) -> bool {
self.0.is_closed()
}
/// Checks whether the sender and another send belong to the same channel.
///
/// # Examples
/// ```
/// use ylong_runtime::sync::mpsc::bounded_channel;
/// let (tx, rx) = bounded_channel::<isize>(1);
/// let tx2 = tx.clone();
/// assert!(tx.is_same(&tx2));
/// ```
pub fn is_same(&self, other: &Self) -> bool {
self.0.same_channel(&other.0)
}
/// Gets the capacity of the channel.
///
/// # Examples
/// ```
/// use ylong_runtime::sync::mpsc::bounded_channel;
/// let (tx, rx) = bounded_channel::<isize>(5);
/// assert_eq!(tx.capacity(), 5);
/// ```
pub fn capacity(&self) -> usize {
self.0.capacity()
}
}
impl<T> BoundedReceiver<T> {
/// Attempts to receive a value from the assocaited [`BoundedSender`].
///
/// # Return value
/// * `Ok(T)` if receiving a value successfully.
/// * `Err(RecvError::Empty)` if no value has been sent yet.
/// * `Err(RecvError::Closed)` if all senders have been dropped.
///
/// # Examples
/// ```
/// use ylong_runtime::sync::mpsc::bounded_channel;
/// use ylong_runtime::sync::error::RecvError;
/// let (tx, mut rx) = bounded_channel(1);
/// match rx.try_recv() {
/// Err(RecvError::Empty) => {},
/// _ => unreachable!(),
/// }
/// tx.try_send(1).unwrap();
/// match rx.try_recv() {
/// Ok(_) => {},
/// _ => unreachable!(),
/// }
/// drop(tx);
/// match rx.try_recv() {
/// Err(RecvError::Closed) => {}
/// _ => unreachable!(),
/// }
/// ```
pub fn try_recv(&mut self) -> Result<T, RecvError> {
self.0.try_recv().map_err(|e| match e {
TryRecvError::Empty => RecvError::Empty,
TryRecvError::Disconnected => RecvError::Closed,
})
}
/// Receives a value from the associated [`BoundedSender`].
///
/// The `receiver` can still receive all sent messages in the channel after the
/// channel is closed.
///
/// # Return value
/// * `Ok(T)` if receiving a value successfully.
/// * `Err(RecvError::Closed)` if all senders has been dropped.
///
/// # Examples
///
/// ```
/// use ylong_runtime::sync::mpsc::bounded_channel;
/// ylong_runtime::block_on(async move {
/// let (tx, mut rx) = bounded_channel(1);
/// let handle = ylong_runtime::spawn(async move {
/// assert_eq!(rx.recv().await, Ok(1))
/// });
/// tx.try_send(1).unwrap();
/// let _ = handle.await;
/// });
/// ```
pub async fn recv(&mut self) -> Result<T, RecvError> {
match self.0.recv().await {
None => Err(RecvError::Closed),
Some(x) => Ok(x),
}
}
/// Receives a value from the assocaited [`BoundedSender`] in a limited amount of time.
///
/// The `receiver` can still receive all sent messages in the channel after the channel
/// is closed.
///
/// # Return value
/// * `Ok(T)` if receiving a value successfully.
/// * `Err(RecvError::Closed)` if all senders has been dropped.
/// * `Err(RecvError::Timeout)` if time limit has been passed.
#[cfg(feature = "timer")]
pub async fn recv_timeout(&mut self, time: Duration) -> Result<T, RecvError> {
match timeout(time, self.recv()).await {
Ok(res) => res,
Err(_) => Err(RecvError::TimeOut),
}
}
/// Closes the channel, prevents the `Sender` from sending more values.
///
/// The `Sender` will fail to call `send` or `try_send` after the `Receiver` called
/// `close`. It will do nothing if the channel is closed.
///
/// # Exampels
/// ```
/// use ylong_runtime::sync::mpsc::bounded_channel;
/// ylong_runtime::block_on(async move {
/// let (tx, mut rx) = bounded_channel(1);
/// assert!(!tx.is_closed());
///
/// rx.close();
/// assert!(tx.is_closed());
/// assert!(tx.try_send("no receive").is_err());
/// });
/// ```
pub fn close(&mut self) {
self.0.close()
}
}

View File

@ -1,5 +0,0 @@
pub(crate) mod bounded;
pub(crate) mod unbounded;
pub use bounded::{bounded_channel, BoundedReceiver, BoundedSender};
pub use unbounded::{unbounded_channel, UnboundedReceiver, UnboundedSender};

View File

@ -1,196 +0,0 @@
use crate::sync::error::{RecvError, SendError};
#[cfg(feature = "timer")]
use crate::timer::timeout::timeout;
#[cfg(feature = "timer")]
use std::time::Duration;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::UnboundedReceiver as TokioReceiver;
use tokio::sync::mpsc::UnboundedSender as TokioSender;
/// The sender of the unbounded channel.
/// A [`UnboundedSender`] and [`UnboundedReceiver`] handle pair are created by the
/// [`unbounded_channel`] function.
/// There could be multiple senders for a channel.
#[derive(Clone)]
pub struct UnboundedSender<T>(TokioSender<T>);
/// The receiver of the unbounded channel.
/// There could be only one receiver for a channel.
pub struct UnboundedReceiver<T>(TokioReceiver<T>);
/// Creates a new mpsc channel, and returns the `Sender` and `Receiver` handle pair.
/// The channel is unbounded.
///
/// # Examples
/// ```
/// use ylong_runtime::sync::mpsc::unbounded_channel;
///
/// ylong_runtime::block_on(async move {
/// let (tx, mut rx) = unbounded_channel();
/// let handle = ylong_runtime::spawn(async move {
/// assert_eq!(rx.recv().await, Ok(1));
/// });
/// let handle2 = ylong_runtime::spawn(async move {
/// assert!(tx.send(1).is_ok());
/// });
/// let _ = handle.await;
/// let _ = handle2.await;
/// });
/// ```
pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
let (s, r) = tokio::sync::mpsc::unbounded_channel();
(UnboundedSender(s), UnboundedReceiver(r))
}
impl<T> UnboundedSender<T> {
/// Sends a value to the associated receiver.
///
/// If the receiver has been closed, this method will return an error containing the
/// sent value.
///
/// # Examples
///
/// ```
/// use ylong_runtime::sync::mpsc::unbounded_channel;
/// use ylong_runtime::sync::error::RecvError;
/// ylong_runtime::block_on(async move {
/// let (tx, mut rx) = unbounded_channel();
/// let handle = ylong_runtime::spawn(async move {
/// assert_eq!(rx.recv().await, Ok(1));
/// });
/// let handle2 = ylong_runtime::spawn(async move {
/// assert!(tx.send(1).is_ok());
/// });
/// let _ = handle.await;
/// let _ = handle2.await;
/// });
/// ```
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
self.0.send(value).map_err(|e| SendError::Closed(e.0))
}
/// Checks whether the channel is closed. If so, the sender could not
/// send values anymore.
///
/// # Examples
/// ```
/// use ylong_runtime::sync::mpsc::unbounded_channel;
/// let (tx, rx) = unbounded_channel::<isize>();
/// assert!(!tx.is_closed());
/// drop(rx);
/// assert!(tx.is_closed());
/// ```
pub fn is_closed(&self) -> bool {
self.0.is_closed()
}
/// Checks whether the sender and another send belong to the same channel.
///
/// # Examples
/// ```
/// use ylong_runtime::sync::mpsc::unbounded_channel;
/// let (tx, rx) = unbounded_channel::<isize>();
/// let tx2 = tx.clone();
/// assert!(tx.is_same(&tx2));
/// ```
pub fn is_same(&self, other: &Self) -> bool {
self.0.same_channel(&other.0)
}
}
impl<T> UnboundedReceiver<T> {
/// Attempts to receive a value from the assocaited [`UnboundedSender`].
///
/// # Return value
/// * `Ok(T)` if receiving a value successfully.
/// * `Err(RecvError::Empty)` if no value has been sent yet.
/// * `Err(RecvError::Closed)` if all senders have been dropped.
///
/// # Examples
/// ```
/// use ylong_runtime::sync::mpsc::unbounded_channel;
/// use ylong_runtime::sync::error::RecvError;
/// let (tx, mut rx) = unbounded_channel();
/// match rx.try_recv() {
/// Err(RecvError::Empty) => {},
/// _ => unreachable!(),
/// }
/// tx.send(1).unwrap();
/// match rx.try_recv() {
/// Ok(_) => {},
/// _ => unreachable!(),
/// }
pub fn try_recv(&mut self) -> Result<T, RecvError> {
self.0.try_recv().map_err(|e| match e {
TryRecvError::Empty => RecvError::Empty,
TryRecvError::Disconnected => RecvError::Empty,
})
}
/// Receives a value from the associated [`UnboundedSender`].
///
/// The `receiver` can still receive all sent messages in the channel after the
/// channel is closed.
///
/// # Return value
/// * `Ok(T)` if receiving a value successfully.
/// * `Err(RecvError::Closed)` if all senders has been dropped.
///
/// # Examples
///
/// ```
/// use ylong_runtime::sync::mpsc::unbounded_channel;
/// ylong_runtime::block_on(async move {
/// let (tx, mut rx) = unbounded_channel();
/// let handle = ylong_runtime::spawn(async move {
/// assert_eq!(rx.recv().await, Ok(1))
/// });
/// tx.send(1).unwrap();
/// let _ = handle.await;
/// });
/// ```
pub async fn recv(&mut self) -> Result<T, RecvError> {
match self.0.recv().await {
None => Err(RecvError::Closed),
Some(x) => Ok(x),
}
}
/// Receives a value from the assocaited [`UnboundedSender`] in a limited amount of time.
///
/// The `receiver` can still receive all sent messages in the channel after the channel
/// is closed.
///
/// # Return value
/// * `Ok(T)` if receiving a value successfully.
/// * `Err(RecvError::Closed)` if all senders has been dropped.
/// * `Err(RecvError::Timeout)` if time limit has been passed.
#[cfg(feature = "timer")]
pub async fn recv_timeout(&mut self, time: Duration) -> Result<T, RecvError> {
match timeout(time, self.recv()).await {
Ok(res) => res,
Err(_) => Err(RecvError::TimeOut),
}
}
/// Closes the channel, prevents the `Sender` from sending more values.
///
/// The `Sender` will fail to call `send` after the `Receiver` called
/// `close`. It will do nothing if the channel is closed.
///
/// # Exampels
/// ```
/// use ylong_runtime::sync::mpsc::unbounded_channel;
/// ylong_runtime::block_on(async move {
/// let (tx, mut rx) = unbounded_channel();
/// assert!(!tx.is_closed());
///
/// rx.close();
/// assert!(tx.is_closed());
/// assert!(tx.send("no receive").is_err());
/// });
/// ```
pub fn close(&mut self) {
self.0.close()
}
}

View File

@ -1,71 +0,0 @@
use crate::sync::LockError;
pub use tokio::sync::Mutex as TokioMutex;
pub use tokio::sync::MutexGuard;
/// An async version of [`std::sync::Mutex`]
///
/// Often it's considered as normal to use [`std::sync::Mutex`] on an asynchronous environment.
/// The primal purpose of this async mutex is to protect shared reference of io, which contains
/// a lot await point during reading and writing. If you only wants to protect a data across
/// different threads, [`std::sync::Mutex`] will probably gain you better performance.
///
/// When using across different futures, users need to wrap the mutex inside an Arc,
/// just like the use of [`std::sync::Mutex`]
#[derive(Debug)]
pub struct Mutex<T: ?Sized>(TokioMutex<T>);
impl<T: Sized> Mutex<T> {
/// Creates a mutex that protects the data passed in.
///
/// # Examples
///
/// ```
/// use ylong_runtime::sync::Mutex;
/// let _lock = Mutex::new(2);
/// ```
pub fn new(t: T) -> Mutex<T> {
Mutex(TokioMutex::new(t))
}
}
unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
impl<T: ?Sized> Mutex<T> {
/// Locks the mutex.
/// If the mutex is already held by others, asynchronously waits for it to release.
///
/// # Examples
/// ```
/// use ylong_runtime::sync::Mutex;
/// ylong_runtime::block_on(async move {
/// let lock = Mutex::new(2);
/// let mut val = lock.lock().await;
/// *val += 1;
/// assert_eq!(*val, 3);
/// });
/// ```
pub async fn lock(&self) -> MutexGuard<'_, T> {
self.0.lock().await
}
/// Attempts to get the mutex.
/// If the lock is already held by others, then LockError will be returned.
/// Otherwise, the mutex guard will be returned.
///
pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, LockError> {
match self.0.try_lock() {
Ok(x) => Ok(x),
Err(_) => Err(LockError),
}
}
/// Gets the mutable reference of the data protected by the lock without actually
/// holding the lock.
///
/// This method takes the mutable reference of the mutex, so there is no need to actually
/// lcok the mutex -- the mutable borrow statically guarantees no locks exist.
pub fn get_mut(&mut self) -> &mut T {
self.0.get_mut()
}
}

View File

@ -1,97 +0,0 @@
use tokio::sync::Notify as TokioNotify;
/// Notifies one or multiple tasks to wake up.
///
/// `Notify` itself does not protect any data. Its only purpose is to signal other
/// tasks to perform an operation.
///
/// # Examples
/// ```
/// use std::sync::Arc;
/// use ylong_runtime::sync::Notify;
/// let notify = Arc::new(Notify::new());
/// let notify2 = notify.clone();
///
/// let _ = ylong_runtime::block_on(async {
/// ylong_runtime::spawn(async move {
/// notify2.notified().await;
/// });
/// notify.notify_one();
/// });
/// ```
pub struct Notify(TokioNotify);
impl Notify {
/// Creates a new Notify.
///
/// # Examples
///
/// ```
/// use ylong_runtime::sync::Notify;
/// let notify = Notify::new();
/// ```
pub fn new() -> Notify {
Notify(TokioNotify::new())
}
/// Asynchronously waits for this Notify to get signaled.
///
/// # Examples
/// ```
/// use std::sync::Arc;
/// use ylong_runtime::sync::Notify;
/// let notify = Arc::new(Notify::new());
/// let notify2 = notify.clone();
/// ylong_runtime::block_on(async {
/// ylong_runtime::spawn(async move {
/// notify2.notified().await;
/// });
/// notify.notify_one();
/// });
/// ```
pub async fn notified(&self) {
self.0.notified().await
}
/// Notifies one task waiting on this `Notify`
///
/// If this method gets called when there is no task waiting on this `Notify`,
/// then the next task called `notified` on it will not get blocked.
///
/// If the method gets called multiple times, only one task will get passed straightly
/// when calling `notified`. Any other task still has to asynchronously wait for it to be
/// released.
pub fn notify_one(&self) {
self.0.notify_one()
}
/// Notifies all tasks waiting on this `Notify`
///
/// Unlike `notify_one`, if this method gets called when there is no task waiting on this `Notify`,
/// the next task called `notified` on it will still get blocked.
///
/// # Examples
/// ```
/// use std::sync::Arc;
/// use std::sync::atomic::AtomicI8;
/// use ylong_runtime::sync::Notify;
/// let notify = Arc::new(Notify::new());
/// let notify2 = notify.clone();
/// let notify3 = notify.clone();
///
/// ylong_runtime::block_on(async {
/// let handle = ylong_runtime::spawn(async move {
/// notify2.notified().await;
///
/// });
/// let handle2 = ylong_runtime::spawn(async move {
/// notify3.notified().await;
/// });
///
/// notify.notify_all();
/// });
/// ```
pub fn notify_all(&self) {
self.0.notify_waiters()
}
}

View File

@ -1,136 +0,0 @@
//! One-shot channel is used to send a single message from a single sender to a single recevier.
//! The [`channel`] function returns a [`Sender`] and [`Receiver`] handle pair that controls channel.
//!
//! The [`Sender`] handle is used by the producer to send a message.
//! The [`Receiver`] handle is used by the consumer to receive the message. It has implemented
//! the `Future` trait
//!
//! The [`Sender::send`] method is not async. It can be called from non-async context.
use crate::sync::error::RecvError;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::oneshot::Receiver as TokioReceiver;
use tokio::sync::oneshot::Sender as TokioSender;
/// Sends a single value to the associated [`Receiver`].
/// A [`Sender`] and [`Receiver`] handle pair is created by the [`channel`] function.
///
/// The receiver will fail with a [`RecvError`] if the sender is dropped without sending a value.
#[derive(Debug)]
pub struct Sender<T>(TokioSender<T>);
/// Receives a single value from the associated [`Sender`].
/// A [`Sender`] and [`Receiver`] handle pair is created by the [`channel`] function.
///
/// There is no `recv` method to receive the message because the receiver iteslf implements
/// the [`Future`] trait. To receive a value, `.await` the `Receiver` object directly.
#[derive(Debug)]
pub struct Receiver<T>(TokioReceiver<T>);
/// Creates a new one-shot channel and returns the `Sender` and `Receiver` handle pair
///
/// The `Sender` could only send a single value to the `Receiver`
///
/// # Examples
///
/// ```
/// use ylong_runtime::sync::oneshot;
/// ylong_runtime::block_on(async move {
/// let (tx, rx) = oneshot::channel();
/// ylong_runtime::spawn(async move {
/// assert!(tx.send(6).is_ok());
/// });
/// let recv = rx.await.unwrap();
/// assert_eq!(recv, 6);
/// });
/// ```
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let (s, r) = tokio::sync::oneshot::channel();
(Sender(s), Receiver(r))
}
impl<T> Sender<T> {
/// Sends a single value to the associated [`Receiver`], returns the value back
/// if it fails to send.
///
/// The sender will consume itself when calling this method. It can send a single value
/// in synchronous code as it doesn't need waiting.
pub fn send(self, value: T) -> Result<(), T> {
self.0.send(value)
}
/// Checks whether the channel is closed. If so, the sender could not
/// send value anymore.
///
/// # Examples
/// ```
/// use ylong_runtime::sync::oneshot;
/// let (tx, rx) = oneshot::channel::<i8>();
/// assert!(!tx.is_closed());
/// drop(rx);
/// assert!(tx.is_closed());
/// ```
pub fn is_closed(&self) -> bool {
self.0.is_closed()
}
}
impl<T> Receiver<T> {
/// Attempts to receive a value from the associated [`Sender`].
///
/// The method will still receive the result if the `Sender` gets dropped after
/// sending the message.
///
/// # Examples
///
/// ```
/// use ylong_runtime::sync::error::RecvError;
/// use ylong_runtime::sync::oneshot;
/// let (tx, mut rx) = oneshot::channel();
/// assert_eq!(rx.try_recv(), Err(RecvError::Empty));
///
/// tx.send("Hello").unwrap();
/// assert_eq!(rx.try_recv(), Ok("Hello"));
/// ```
pub fn try_recv(&mut self) -> Result<T, RecvError> {
self.0.try_recv().map_err(|e| match e {
TryRecvError::Empty => RecvError::Empty,
TryRecvError::Closed => RecvError::Closed,
})
}
/// Closes the channel, prevents the `Sender` from sending a value.
///
/// The `Sender` will fail to call [`send`] after the `Receiver` called `close`.
/// It will do nothing if the channel is already closed or the message has been
/// already received.
///
/// # Examples
///
/// ```
/// use ylong_runtime::sync::oneshot;
/// let (tx, mut rx) = oneshot::channel();
/// assert!(!tx.is_closed());
///
/// rx.close();
/// assert!(tx.is_closed());
/// assert!(tx.send("no receive").is_err());
/// ```
pub fn close(&mut self) {
self.0.close();
}
}
impl<T> Future for Receiver<T> {
type Output = Result<T, RecvError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
Pin::new(&mut this.0)
.poll(cx)
.map_err(|_| RecvError::TimeOut)
}
}

View File

@ -1,147 +0,0 @@
use crate::sync::LockError;
use tokio::sync::RwLock as TokioRwLock;
pub use tokio::sync::{RwLockReadGuard, RwLockWriteGuard};
/// An asynchronous version of `RwLock` in 'std'.
///
/// RwLock allows multiple readers or a single writer to operate concurrently.
/// Readers are only allowed to read the data, but the writer is the only one can
/// change the data inside.
///
/// This RwLock's policy is writer-first, to prevent writers from starving.
///
/// # Examples
///
/// ```
/// use ylong_runtime::sync::RwLock;
/// ylong_runtime::block_on(async {
/// let lock = RwLock::new(0);
/// let r1 = lock.read().await;
/// let r2 = lock.read().await;
/// assert_eq!(*r1, 0);
/// assert_eq!(*r2, 0);
/// drop((r1, r2));
///
/// let mut w = lock.write().await;
/// *w += 1;
/// assert_eq!(*w, 1);
/// });
/// ```
pub struct RwLock<T: ?Sized>(TokioRwLock<T>);
unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {}
impl<T: Sized> RwLock<T> {
/// Creates a new RwLock. `T` is the data that needs to be protected
/// by this RwLock.
///
/// # Examples
///
/// ```
/// use ylong_runtime::sync::RwLock;
/// let lock = RwLock::new(0);
/// ```
pub fn new(t: T) -> RwLock<T> {
RwLock(TokioRwLock::new(t))
}
}
impl<T: ?Sized> RwLock<T> {
/// Asynchronously acquires the read lock.
///
/// If there is a writer holding the write lock, then this method will wait asynchronously
/// for the write lock to get released.
///
/// Buf if the write lock is not held, it's ok for mulitple readers to hold the read lock
/// concurrently.
///
/// ```
/// use ylong_runtime::sync::RwLock;
/// ylong_runtime::block_on(async {
/// let lock = RwLock::new(0);
/// let r1 = lock.read().await;
/// assert_eq!(*r1, 0);
/// });
/// ```
pub async fn read(&self) -> RwLockReadGuard<'_, T> {
self.0.read().await
}
/// Attempts to get the read lock. If another writer is holding the write lock, then
/// a LockError will be returned. Otherwise, a [`RwLockReadGuard`] will be returned.
///
/// ```
/// use ylong_runtime::sync::RwLock;
/// let lock = RwLock::new(0);
/// let r1 = lock.try_read().unwrap();
/// assert_eq!(*r1, 0);
/// ```
pub fn try_read(&self) -> Result<RwLockReadGuard<'_, T>, LockError> {
self.0.try_read().map_err(|_| LockError {})
}
/// Asynchronously acquires the write lock.
///
/// If there is other readers or writers, then this method will wait asynchronously
/// for them to get released.
///
/// ```
/// use ylong_runtime::sync::RwLock;
/// ylong_runtime::block_on(async {
/// let lock = RwLock::new(0);
/// let r1 = lock.read().await;
///
/// });
/// ```
pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
self.0.write().await
}
/// Attempts to acquire the write lock,
///
/// If any other task holds the read/write lock, a LockError will be returned.
///
/// ```
/// use ylong_runtime::sync::RwLock;
/// let lock = RwLock::new(0);
/// let mut r1 = lock.try_write().unwrap();
/// *r1 += 1;
/// assert_eq!(*r1, 1);
/// ```
pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>, LockError> {
self.0.try_write().map_err(|_| LockError {})
}
/// Consumes the lock, and returns the data protected by it.
///
/// # Examples
/// ```
/// use ylong_runtime::sync::RwLock;
/// let lock = RwLock::new(0);
/// assert_eq!(lock.into_inner(), 0);
/// ```
pub fn into_inner(self) -> T
where
T: Sized,
{
self.0.into_inner()
}
/// Gets the mutable reference of the data protected by the lock.
///
/// This method takes the mutable reference of the RwLock, so there is no need to actually
/// lock the mutex -- the mutable borrow statically guarantees no locks exist.
///
/// ```
/// use ylong_runtime::sync::RwLock;
/// ylong_runtime::block_on(async {
/// let mut lock = RwLock::new(0);
/// *lock.get_mut() = 10;
/// assert_eq!(*lock.write().await, 10);
/// });
/// ```
pub fn get_mut(&mut self) -> &mut T {
self.0.get_mut()
}
}

View File

@ -1,143 +0,0 @@
use std::error::Error;
use std::fmt::{Debug, Display, Formatter};
pub use tokio::sync::SemaphorePermit;
use tokio::sync::{Semaphore as TokioSemaphore, TryAcquireError};
const MAX_PERMITS: usize = usize::MAX >> 1;
/// Asynchronous counting semaphore. It allows more than one caller to access the shared resource.
/// Semaphore contains a set of permits. Call `acquire` method and get a permit to access the
/// shared resource. When permits are used up, new requests to acquire permit will wait until
/// [`Semaphore::release`] method is called or permit acquired before gets dropped. When no request
/// is waiting, calling `release` method will add a permit to semaphore.
pub struct Semaphore(TokioSemaphore);
#[derive(Debug, Eq, PartialEq)]
pub enum SemaphoreError {
Overflow,
Empty,
Closed,
}
impl Display for SemaphoreError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
SemaphoreError::Overflow => write!(f, "permit overflowed"),
SemaphoreError::Empty => write!(f, "no permit inside"),
SemaphoreError::Closed => write!(f, "semaphore closed"),
}
}
}
impl Error for SemaphoreError {}
impl Semaphore {
/// Creates a semaphore with an initial permit value.
///
/// # Examples
/// ```
/// use ylong_runtime::sync::Semaphore;
/// let sem = Semaphore::new(4).unwrap();
/// ```
pub fn new(permits: usize) -> Result<Semaphore, SemaphoreError> {
if permits >= MAX_PERMITS {
return Err(SemaphoreError::Overflow);
}
Ok(Semaphore(TokioSemaphore::new(permits)))
}
/// Gets the number of remaining permits.
///
/// # Examples
///
/// ```
/// use ylong_runtime::sync::Semaphore;
/// let sem = Semaphore::new(4).unwrap();
/// assert_eq!(sem.current_permits(), 4);
/// ```
pub fn current_permits(&self) -> usize {
self.0.available_permits()
}
/// Adds a permit to the semaphore.
///
/// # Exampels
///
/// ```
/// use ylong_runtime::sync::Semaphore;
/// let sem = Semaphore::new(4).unwrap();
/// assert_eq!(sem.current_permits(), 4);
/// sem.release();
/// assert_eq!(sem.current_permits(), 5);
/// ```
pub fn release(&self) {
self.0.add_permits(1);
}
/// Attempts to acquire a permit from semaphore.
///
/// # Examples
/// ```
/// use ylong_runtime::sync::Semaphore;
/// let sem = Semaphore::new(4).unwrap();
/// assert_eq!(sem.current_permits(), 4);
/// let permit = sem.try_acquire().unwrap();
/// assert_eq!(sem.current_permits(), 3);
/// drop(permit);
/// assert_eq!(sem.current_permits(), 4);
/// ```
pub fn try_acquire(&self) -> Result<SemaphorePermit, SemaphoreError> {
self.0.try_acquire().map_err(|e| match e {
TryAcquireError::Closed => SemaphoreError::Closed,
TryAcquireError::NoPermits => SemaphoreError::Empty,
})
}
/// Asynchronously acquires a permit from semaphore.
///
/// # Examples
///
/// ```
/// use ylong_runtime::sync::Semaphore;
/// ylong_runtime::block_on(async {
/// let sem = Semaphore::new(2).unwrap();
/// ylong_runtime::spawn(async move {
/// let _permit = sem.acquire().await.unwrap();
/// });
/// });
/// ```
pub async fn acquire(&self) -> Result<SemaphorePermit, SemaphoreError> {
self.0.acquire().await.map_err(|_| SemaphoreError::Closed)
}
/// Checks whether semaphore is closed. If so, the semaphore could not be acquired anymore.
///
/// # Examples
///
/// ```
/// use ylong_runtime::sync::Semaphore;
/// let sem = Semaphore::new(4).unwrap();
/// assert!(!sem.is_closed());
/// sem.close();
/// assert!(sem.is_closed());
/// ```
pub fn is_closed(&self) -> bool {
self.0.is_closed()
}
/// Closes the semaphore so that it could not be acquired anymore,
/// and it notifies all requests in the waiting list.
///
/// # Examples
///
/// ```
/// use ylong_runtime::sync::Semaphore;
/// let sem = Semaphore::new(4).unwrap();
/// assert!(!sem.is_closed());
/// sem.close();
/// assert!(sem.is_closed());
/// ```
pub fn close(&self) {
self.0.close()
}
}

View File

@ -1,56 +0,0 @@
use crate::executor::get_global_runtime;
use crate::join_handle::JoinHandle;
use crate::task::PriorityLevel;
use std::future::Future;
/// Tasks attribute
pub struct TaskBuilder {
pub(crate) name: Option<String>,
pub(crate) pri: Option<PriorityLevel>,
}
impl Default for TaskBuilder {
fn default() -> Self {
TaskBuilder::new()
}
}
impl TaskBuilder {
/// Creates a new `TaskBuilder` with a default setting.
pub fn new() -> Self {
TaskBuilder {
name: None,
pri: None,
}
}
/// Sets the name of the task
pub fn name(mut self, name: String) -> Self {
self.name = Some(name);
self
}
/// Sets the priority of the task
pub fn pri(mut self, pri_level: PriorityLevel) -> Self {
self.pri = Some(pri_level);
self
}
/// Using the current task setting, spawns a task onto the global runtime
pub fn spawn<T, R>(&self, task: T) -> JoinHandle<R>
where
T: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
get_global_runtime().spawn(task)
}
/// Using the current task setting, spawns a task onto the blocking pool.
pub fn spawn_blocking<T, R>(&self, task: T) -> JoinHandle<R>
where
T: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
get_global_runtime().spawn_blocking(task)
}
}

View File

@ -1,40 +0,0 @@
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
/// A handle to the actual spawned task.
///
/// This can be considered as the equivalent of [`std::thread::JoinHandle`]
/// for a ylong task rather than a thread.
///
/// It could be used to join the corresponding task or cancel it.
/// If a `JoinHandle` is dropped, then the task continues executing in the background
/// and its return value is lost. There is no way to join the task after its JoinHandle
/// is dropped.
///
/// # Examples
/// ```
/// let handle = ylong_runtime::spawn(async {
/// let handle2 = ylong_runtime::spawn(async {1});
/// assert_eq!(handle2.await.unwrap(), 1);
/// });
/// ylong_runtime::block_on(handle).unwrap();
/// ```
pub struct JoinHandle<R>(pub(crate) tokio::task::JoinHandle<R>);
impl<R> Future for JoinHandle<R> {
type Output = io::Result<R>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
Pin::new(&mut this.0).poll(cx).map_err(io::Error::from)
}
}
impl<R> JoinHandle<R> {
/// Abort the task associated with the handle.
pub fn cancel(&self) {
self.0.abort()
}
}

View File

@ -1,11 +0,0 @@
pub mod builder;
pub mod join_handle;
/// Task priority level, ranges from high to low
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum PriorityLevel {
AbsHigh,
High,
Low,
AbsLow,
}

View File

@ -1,34 +0,0 @@
use std::time::{Duration, Instant};
use tokio::time::Interval as TokioInterval;
pub struct Interval(TokioInterval);
/// Creates a new [`Interval`] that yields with an interval of `period`. The first tick
/// completes immediately. The default [`TimeoutPolicies`] is [`TimeoutPolicies::Burst`]
pub fn interval(period: Duration) -> Interval {
Interval(tokio::time::interval(period))
}
/// Creates new [`Interval`] that yields with interval of `period` with the
/// first tick completing at `start`. The default [`TimeoutPolicies`] is
/// [`TimeoutPolicies::Burst`]
pub fn interval_at(start: Instant, period: Duration) -> Interval {
Interval(tokio::time::interval_at(start.into(), period))
}
impl Interval {
/// Waits until the next instant is reached
pub async fn tick(&mut self) -> Instant {
self.0.tick().await.into_std()
}
/// Resets the `Interval` from now on
pub fn reset(&mut self) {
self.0.reset()
}
/// Gets the period of the `Interval`
pub fn period(&self) -> Duration {
self.0.period()
}
}

View File

@ -1,3 +0,0 @@
pub mod interval;
pub mod sleep;
pub mod timeout;

View File

@ -1 +0,0 @@
pub use tokio::time::sleep;

View File

@ -1 +0,0 @@
pub use tokio::time::timeout;