initial commit

This commit is contained in:
Sean McArthur 2018-03-15 15:26:45 -07:00
commit 770ebf7674
5 changed files with 426 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
/target
**/*.rs.bk
Cargo.lock

15
Cargo.toml Normal file
View File

@ -0,0 +1,15 @@
[package]
name = "want"
version = "0.0.0" # remember to update html_root_url
description = "Detect when another Future wants a result."
keywords = ["futures", "channel"]
authors = ["Sean McArthur <sean@seanmonstar.com>"]
license = "MIT"
repository = "https://github.com/seanmonstar/want"
homepage = "https://github.com/seanmonstar/want"
documentation = "https://docs.rs/want"
[dependencies]
futures = "0.1"
log = "0.4"
try-lock = "0.1"

20
LICENSE Normal file
View File

@ -0,0 +1,20 @@
Copyright (c) 2018 Sean McArthur
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

3
README.md Normal file
View File

@ -0,0 +1,3 @@
# Want
A `Future`s channel-like utility to signal when a value is wanted.

384
src/lib.rs Normal file
View File

@ -0,0 +1,384 @@
#![doc(html_root_url = "https://docs.rs/want/0.0.0")]
#![deny(warnings)]
#![deny(missing_docs)]
#![deny(missing_debug_implementations)]
//! A Futures channel-like utility to signal when a value is wanted.
//!
//! Futures are supposed to be lazy, and only starting work if `Future::poll`
//! is called. The same is true of `Stream`s, but when using a channel as
//! a `Stream`, it can be hard to know if the receiver is ready for the next
//! value.
//!
//! Put another way, given a `(tx, rx)` from `futures::sync::mpsc::channel()`,
//! how can the sender (`tx`) know when the receiver (`rx`) actually wants more
//! work to be produced? Just because there is room in the channel buffer
//! doesn't mean the work would be used by the receiver.
//!
//! This is where something like `want` comes in. Added to a channel, you can
//! make sure that the `tx` only creates the message and sends it when the `rx`
//! has `poll()` for it, and the buffer was empty.
extern crate futures;
#[macro_use]
extern crate log;
extern crate try_lock;
use std::fmt;
use std::mem;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::{Async, Poll};
use futures::task::{self, Task};
use try_lock::TryLock;
/// Create a new `want` channel.
pub fn new() -> (Giver, Taker) {
let inner = Arc::new(Inner {
state: AtomicUsize::new(State::Idle.into()),
task: TryLock::new(None),
});
let inner2 = inner.clone();
(
Giver {
inner: inner,
},
Taker {
inner: inner2,
},
)
}
/// An entity that gives a value when wanted.
pub struct Giver {
inner: Arc<Inner>,
}
/// An entity that wants a value.
pub struct Taker {
inner: Arc<Inner>,
}
/// The `Taker` has canceled its interest in a value.
pub struct Closed {
_inner: (),
}
#[derive(Clone, Copy, Debug)]
enum State {
Idle,
Want,
Give,
Closed,
}
impl From<State> for usize {
fn from(s: State) -> usize {
match s {
State::Idle => 0,
State::Want => 1,
State::Give => 2,
State::Closed => 3,
}
}
}
impl From<usize> for State {
fn from(num: usize) -> State {
match num {
0 => State::Idle,
1 => State::Want,
2 => State::Give,
3 => State::Closed,
_ => unreachable!("unknown state: {}", num),
}
}
}
struct Inner {
state: AtomicUsize,
task: TryLock<Option<Task>>,
}
// ===== impl Giver ======
impl Giver {
/// Poll whether the `Taker` has registered interest in another value.
///
/// - If the `Taker` has called `want()`, this returns `Async::Ready(())`.
/// - If the `Taker` has not called `want()` since last poll, this
/// returns `Async::NotReady`, and parks the current task to be notified
/// when the `Taker` does call `want()`.
/// - If the `Taker` has canceled (or dropped), this returns `Closed`.
pub fn poll_want(&mut self) -> Poll<(), Closed> {
loop {
let state = self.inner.state.load(Ordering::SeqCst).into();
match state {
State::Want => {
trace!("poll_want: taker wants!");
// only set to IDLE if it is still Want
self.inner.state.compare_and_swap(
State::Want.into(),
State::Idle.into(),
Ordering::SeqCst,
);
return Ok(Async::Ready(()));
},
State::Closed => {
trace!("poll_want: closed");
return Err(Closed { _inner: () });
},
State::Idle | State::Give => {
// Taker doesn't want anything yet, so park.
if let Some(mut locked) = self.inner.task.try_lock() {
// While we have the lock, try to set to GIVE.
let old = self.inner.state.compare_and_swap(
state.into(),
State::Give.into(),
Ordering::SeqCst,
);
// If it's still the first state (Idle or Give), park current task.
if old == state.into() {
trace!("poll_want: taker doesn't want, parking task");
let park = locked.as_ref()
.map(|t| t.will_notify_current())
.unwrap_or(true);
if park {
mem::replace(&mut *locked, Some(task::current()))
.map(|prev_task| {
// there was an old task parked here.
// it might be waiting to be notified,
// so poke it before dropping.
prev_task.notify();
});
}
return Ok(Async::NotReady)
}
// Otherwise, something happened! Go around the loop again.
} else {
// if we couldn't take the lock, then a Taker has it.
// The *ONLY* reason is because it is in the process of notifying us
// of its want.
//
// We need to loop again to see what state it was changed to.
}
},
}
}
}
/// Check if the `Taker` has called `want()` without parking a task.
///
/// This is safe to call outside of a futures task context, but other
/// means of being notified is left to the user.
pub fn is_wanting(&self) -> bool {
self.inner.state.load(Ordering::SeqCst) == State::Want.into()
}
/// Check if the `Taker` has canceled interest without parking a task.
pub fn is_canceled(&self) -> bool {
self.inner.state.load(Ordering::SeqCst) == State::Closed.into()
}
}
impl fmt::Debug for Giver {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Giver")
.field("state", &self.inner.state())
.finish()
}
}
// ===== impl Taker ======
impl Taker {
/// Signal to the `Giver` that the want is canceled.
///
/// This is useful to tell that the channel is closed if you cannot
/// drop the value yet.
pub fn cancel(&mut self) {
trace!("signal: {:?}", State::Closed);
self.signal(State::Closed)
}
/// Signal to the `Giver` that a value is wanted.
pub fn want(&mut self) {
trace!("signal: {:?}", State::Want);
self.signal(State::Want)
}
fn signal(&mut self, state: State) {
let old_state = self.inner.state.swap(state.into(), Ordering::SeqCst).into();
match old_state {
State::Idle | State::Want | State::Closed => (),
State::Give => {
loop {
if let Some(mut locked) = self.inner.task.try_lock() {
if let Some(task) = locked.take() {
trace!("signal found waiting giver, notifying");
task.notify();
}
return;
} else {
// if we couldn't take the lock, then a Giver has it.
// The *ONLY* reason is because it is in the process of parking.
//
// We need to loop and take the lock so we can notify this task.
}
}
},
}
}
}
impl Drop for Taker {
fn drop(&mut self) {
self.signal(State::Closed);
}
}
impl fmt::Debug for Taker {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Taker")
.field("state", &self.inner.state())
.finish()
}
}
// ===== impl Closed ======
impl fmt::Debug for Closed {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Closed")
.finish()
}
}
// ===== impl Inner ======
impl Inner {
fn state(&self) -> State {
self.state.load(Ordering::SeqCst).into()
}
}
#[cfg(test)]
mod tests {
use std::thread;
use futures::{Async, Stream};
use futures::future::{poll_fn, Future};
use futures::sync::{mpsc, oneshot};
use super::*;
#[test]
fn want_ready() {
let (mut gv, mut tk) = new();
tk.want();
assert!(gv.poll_want().unwrap().is_ready());
}
#[test]
fn want_notify() {
let (mut gv, mut tk) = new();
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
tk.want();
// use a oneshot to keep this thread alive
// until other thread was notified of want
rx.wait().expect("rx");
});
poll_fn(move || {
gv.poll_want()
}).wait().expect("wait");
tx.send(()).expect("tx");
}
#[test]
fn cancel() {
// explicit
let (mut gv, mut tk) = new();
assert!(!gv.is_canceled());
tk.cancel();
assert!(gv.is_canceled());
assert!(gv.poll_want().is_err());
// implicit
let (mut gv, tk) = new();
assert!(!gv.is_canceled());
drop(tk);
assert!(gv.is_canceled());
assert!(gv.poll_want().is_err());
// notifies
let (mut gv, tk) = new();
thread::spawn(move || {
let _tk = tk;
// and dropped
});
poll_fn(move || {
gv.poll_want()
}).wait().expect_err("wait");
}
#[test]
fn stress() {
let nthreads = 5;
let nwants = 100;
for _ in 0..nthreads {
let (mut gv, mut tk) = new();
let (mut tx, mut rx) = mpsc::channel(0);
// rx thread
thread::spawn(move || {
let mut cnt = 0;
poll_fn(move || {
while cnt < nwants {
let n = match rx.poll().expect("rx poll") {
Async::Ready(n) => n.expect("rx opt"),
Async::NotReady => {
tk.want();
return Ok(Async::NotReady);
},
};
assert_eq!(cnt, n);
cnt += 1;
}
Ok::<_, ()>(Async::Ready(()))
}).wait().expect("rx wait");
});
// tx thread
thread::spawn(move || {
let mut cnt = 0;
let nsent = poll_fn(move || {
loop {
while let Ok(()) = tx.try_send(cnt) {
cnt += 1;
}
match gv.poll_want() {
Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => return Ok::<_, ()>(Async::NotReady),
Err(_) => return Ok(Async::Ready(cnt)),
}
}
}).wait().expect("tx wait");
assert_eq!(nsent, nwants);
}).join().expect("thread join");
}
}
}