Revert "util: implement stream debounce combinator (#747)" (#834)

This reverts commit 7a49ebb65e.

The commit conflicted with another change that was merged, causing CI to fail. The public API
also requires a bit more refinement (#833) and Tokio crates need to be released.
This commit is contained in:
Carl Lerche 2019-01-06 16:56:49 -08:00 committed by GitHub
parent 7a49ebb65e
commit 74c73b218e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 1 additions and 647 deletions

View File

@ -1,8 +1,7 @@
#[cfg(feature = "timer")] #[cfg(feature = "timer")]
use tokio_timer::{ use tokio_timer::{
debounce::{Debounce, DebounceBuilder, Edge},
throttle::Throttle, throttle::Throttle,
timeout::Timeout, Timeout,
}; };
use futures::Stream; use futures::Stream;
@ -25,63 +24,6 @@ use std::time::Duration;
/// ///
/// [`timeout`]: #method.timeout /// [`timeout`]: #method.timeout
pub trait StreamExt: Stream { pub trait StreamExt: Stream {
/// Debounce the stream on the trailing edge using the given duration.
///
/// Errors will pass through without being debounced. Debouncing will
/// happen on the trailing edge. This means all items (except the last
/// one) will be discarded until the delay has elapsed without an item
/// being passed through. The last item that was passed through will
/// be returned.
///
/// Care must be taken that this stream returns `Async::NotReady` at some point,
/// otherwise the debouncing implementation will overflow the stack during
/// `.poll()` (i. e. don't use this directly on `stream::repeat`).
///
/// See also [`debounce_builder`], which allows more configuration over how the
/// debouncing is done.
///
/// [`debounce_builder`]: #method.debounce_builder
fn debounce(self, dur: Duration) -> Debounce<Self>
where Self: Sized
{
self.debounce_builder()
.duration(dur)
.edge(Edge::Trailing)
.build()
}
/// Create a builder that builds a debounced version of this stream.
///
/// The returned builder can be used to configure the debouncing process.
///
/// Care must be taken that this stream returns `Async::NotReady` at some point,
/// otherwise the debouncing implementation will overflow the stack during
/// `.poll()` (i. e. don't use this directly on `stream::repeat`).
fn debounce_builder(self) -> DebounceBuilder<Self>
where Self: Sized
{
DebounceBuilder::from_stream(self)
}
/// Sample the stream at the given `interval`.
///
/// Sampling works similar to debouncing in that frequent values will be
/// ignored. Sampling, however, ensures that an item is passed through at
/// least after every `interval`. Debounce, on the other hand, would not
/// pass items through until there has been enough "silence".
///
/// Care must be taken that this stream returns `Async::NotReady` at some point,
/// otherwise the sampling implementation will overflow the stack during
/// `.poll()` (i. e. don't use this directly on `stream::repeat`).
fn sample(self, interval: Duration) -> Debounce<Self>
where Self: Sized
{
self.debounce_builder()
.max_wait(interval)
.edge(Edge::Leading)
.build()
}
/// Throttle down the stream by enforcing a fixed delay between items. /// Throttle down the stream by enforcing a fixed delay between items.
/// ///
/// Errors are also delayed. /// Errors are also delayed.

View File

@ -1,408 +0,0 @@
//! Debounce streams on the leading or trailing edge or both edges for a certain
//! amount of time.
//!
//! See [`Debounce`] for more details.
//!
//! [`Debounce`]: struct.Debounce.html
use {clock, Delay, Error};
use futures::{
future::Either,
prelude::*,
};
use std::{
cmp,
error::Error as StdError,
fmt::{Display, Formatter, Result as FmtResult},
time::{Duration, Instant},
};
/// Debounce streams on the leading or trailing edge or both.
///
/// Useful for slowing processing of e. g. user input or network events
/// to a bearable rate.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Debounce<T: Stream> {
delay: Option<Delay>,
duration: Duration,
edge: Edge,
last_item: Option<T::Item>,
max_wait: Option<Duration>,
max_wait_to: Option<Instant>,
stream: T,
}
/// Builds a debouncing stream.
#[derive(Debug)]
pub struct DebounceBuilder<T> {
duration: Option<Duration>,
edge: Option<Edge>,
max_wait: Option<Duration>,
stream: T,
}
/// Either the error of the underlying stream, or an error within tokio's
/// timing machinery.
#[derive(Debug)]
pub struct DebounceError<T>(Either<T, Error>);
/// Which edge the debounce tiggers on.
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
pub enum Edge {
/// The debounce triggers on the leading edge.
///
/// The first stream item will be returned immediately and subsequent ones
/// will be ignored until the delay has elapsed without items passing through.
Leading,
/// The debounce triggers on the trailing edge.
///
/// All items (except the last one) are thrown away until the delay has elapsed
/// without items passing through. The last item is returned.
Trailing,
/// The debounce triggers on both the leading and the trailing edge.
///
/// The first and the last items will be returned.
///
/// Note that trailing edge behavior will only be visible if the underlying
/// stream fires at least twice during the debouncing period.
Both,
}
impl<T: Stream> Debounce<T> {
/// Constructs a new stream that debounces the items passed through.
///
/// Care must be taken that `stream` returns `Async::NotReady` at some point,
/// otherwise the debouncing implementation will overflow the stack during
/// `.poll()` (i. e. don't use this directly on `stream::repeat`).
pub fn new(
stream: T,
duration: Duration,
edge: Edge,
max_wait: Option<Duration>,
) -> Self {
Self {
delay: None,
duration,
edge,
last_item: None,
max_wait,
max_wait_to: None,
stream,
}
}
/// Acquires a reference to the underlying stream that this combinator is
/// pulling from.
pub fn get_ref(&self) -> &T {
&self.stream
}
/// Acquires a mutable reference to the underlying stream that this combinator
/// is pulling from.
///
/// Note that care must be taken to avoid tampering with the state of the stream
/// which may otherwise confuse this combinator.
pub fn get_mut(&mut self) -> &mut T {
&mut self.stream
}
/// Consumes this combinator, returning the underlying stream.
///
/// Note that this may discard intermediate state of this combinator, so care
/// should be taken to avoid losing resources when this is called.
pub fn into_inner(self) -> T {
self.stream
}
/// Computes the instant at which the next debounce delay elapses.
fn delay_time(&mut self) -> Instant {
let next = clock::now() + self.duration;
if let Some(to) = self.max_wait_to {
cmp::min(next, to)
} else {
next
}
}
/// Polls the underlying delay future.
fn poll_delay(d: &mut Delay) -> Poll<(), <Self as Stream>::Error> {
d.poll().map_err(DebounceError::from_timer_error)
}
/// Polls the underlying stream.
fn poll_stream(
&mut self,
) -> Poll<Option<<Self as Stream>::Item>, <Self as Stream>::Error> {
self.stream.poll().map_err(DebounceError::from_stream_error)
}
/// Starts a new delay using the current duration and maximum waiting time.
fn start_delay(&mut self) {
self.max_wait_to = self.max_wait.map(|dur| clock::now() + dur);
self.delay = Some(Delay::new(self.delay_time()));
}
}
impl<T: Stream> Stream for Debounce<T> {
type Item = T::Item;
type Error = DebounceError<T::Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.delay.take() {
Some(mut d) => match Self::poll_delay(&mut d)? {
// Delay has woken us up and is over, if we're trailing edge this
// means we need to return the last item.
Async::Ready(_) => {
if self.edge.is_trailing() {
if let Some(item) = self.last_item.take() {
return Ok(Async::Ready(Some(item)));
}
}
return Ok(Async::NotReady);
},
// The stream has woken us up, but we have a delay.
Async::NotReady => match self.poll_stream()? {
// We have gotten an item, but we're currently blocked on
// the delay. Save it for later and reset the timer.
Async::Ready(Some(item)) => {
d.reset(self.delay_time());
self.delay = Some(d);
self.last_item = Some(item);
self.poll()
},
// The stream has ended. Communicate this immediately to the
// following stream.
Async::Ready(None) => Ok(Async::Ready(None)),
Async::NotReady => {
self.delay = Some(d);
Ok(Async::NotReady)
},
},
},
None => match try_ready!(self.poll_stream()) {
// We have gotten an item. Set up the delay for future items to be
// debounced. If we're on leading edge, return the item, otherwise
// save it for later.
Some(item) => {
self.start_delay();
if self.edge.is_leading() {
Ok(Async::Ready(Some(item)))
} else {
self.last_item = Some(item);
self.poll()
}
},
// The stream has ended. Communicate this immediately to the
// following stream.
None => Ok(Async::Ready(None)),
},
}
}
}
impl<T> DebounceBuilder<T> {
/// Creates a new builder from the given debounce stream.
///
/// Care must be taken that `stream` returns `Async::NotReady` at some point,
/// otherwise the debouncing implementation will overflow the stack during
/// `.poll()` (i. e. don't use this directly on `stream::repeat`).
pub fn from_stream(stream: T) -> Self {
DebounceBuilder {
duration: None,
edge: None,
max_wait: None,
stream: stream,
}
}
/// Sets the duration to debounce to.
///
/// If no duration is set here but [`max_wait`] is given instead, the resulting
/// stream will sample the underlying stream at the interval given by
/// [`max_wait`] instead of debouncing it.
///
/// [`max_wait`]: #method.max_wait
pub fn duration(mut self, dur: Duration) -> Self {
self.duration = Some(dur);
self
}
/// Sets the debouncing edge.
///
/// An edge MUST be set before trying to [`build`] the debounce stream.
///
/// [`build`]: #method.build
pub fn edge(mut self, edge: Edge) -> Self {
self.edge = Some(edge);
self
}
/// Sets the maximum waiting time.
///
/// If only a `max_wait` is given (and no [`duration`]), the resulting stream
/// will sample the underlying stream at the interval given by `max_wait`
/// instead of debouncing it.
/// Sampling cannot occur on both edges. Trying to build a sampling stream
/// on both edges will panic.
///
/// [`duration`]: #method.duration
pub fn max_wait(mut self, max_wait: Duration) -> Self {
self.max_wait = Some(max_wait);
self
}
}
impl<T: Stream> DebounceBuilder<T> {
/// Builds the debouncing stream.
///
/// Panics if the edge or the duration is unspecified, or if only `max_wait`
/// together with `Edge::Both` was specified.
pub fn build(self) -> Debounce<T> {
let edge = self.edge.expect("missing debounce edge");
// If we've only been given a maximum waiting time, this means we need to
// sample the stream at the interval given by max_wait instead of
// debouncing it.
let duration = match self.max_wait {
Some(max_wait) => match self.duration {
Some(dur) => dur,
None => {
// Sampling on both edges leads to unexpected behavior where, when a
// sample interval elapses, two items will be returned.
assert!(edge != Edge::Both, "cannot sample on both edges");
// The actual duration added here doesn't matter, as long as its
// means the result is longer than `max_wait` and we have more than
// a millisecond (tokio timer precision).
max_wait + Duration::from_secs(1)
},
},
None => self.duration.expect("missing debounce duration")
};
Debounce::new(
self.stream,
duration,
edge,
self.max_wait,
)
}
}
impl<T> DebounceError<T> {
/// Creates an error from the given stream error.
pub fn from_stream_error(err: T) -> Self {
DebounceError(Either::A(err))
}
/// Creates an error from the given timer error.
pub fn from_timer_error(err: Error) -> Self {
DebounceError(Either::B(err))
}
/// Gets the underlying stream error, if present.
pub fn get_stream_error(&self) -> Option<&T> {
match self.0 {
Either::A(ref err) => Some(err),
_ => None,
}
}
/// Gets the underlying timer error, if present.
pub fn get_timer_error(&self) -> Option<&Error> {
match self.0 {
Either::B(ref err) => Some(err),
_ => None,
}
}
/// Attempts to convert the error into the stream error.
pub fn into_stream_error(self) -> Option<T> {
match self.0 {
Either::A(err) => Some(err),
_ => None,
}
}
/// Attempts to convert the error into the timer error.
pub fn into_timer_error(self) -> Option<Error> {
match self.0 {
Either::B(err) => Some(err),
_ => None,
}
}
/// Determines whether the underlying error is a stream error.
pub fn is_stream_error(&self) -> bool {
!self.is_timer_error()
}
/// Determines whether the underlying error is an error within
/// tokio's timer machinery.
pub fn is_timer_error(&self) -> bool {
match self.0 {
Either::B(_) => true,
_ => false,
}
}
}
impl<T: StdError> Display for DebounceError<T> {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
match self.0 {
Either::A(ref err) => write!(f, "stream error: {}", err),
Either::B(ref err) => write!(f, "timer error: {}", err),
}
}
}
impl<T: StdError + 'static> StdError for DebounceError<T> {
fn description(&self) -> &str {
match self.0 {
Either::A(_) => "stream error",
Either::B(_) => "timer error",
}
}
fn cause(&self) -> Option<&StdError> {
match self.0 {
Either::A(ref err) => Some(err),
Either::B(ref err) => Some(err),
}
}
}
impl Edge {
/// The edge is either leading edge or both edges.
pub fn is_leading(&self) -> bool {
match self {
Edge::Leading | Edge::Both => true,
_ => false,
}
}
/// The edge is either trailing edge or both edges.
pub fn is_trailing(&self) -> bool {
match self {
Edge::Trailing | Edge::Both => true,
_ => false
}
}
}

View File

@ -5,9 +5,6 @@
//! //!
//! This crate provides a number of utilities for working with periods of time: //! This crate provides a number of utilities for working with periods of time:
//! //!
//! * [`Debounce`]: Wraps a stream, throwing items away until there has been at
//! least some amount of time without items having passed through.
//!
//! * [`Delay`]: A future that completes at a specified instant in time. //! * [`Delay`]: A future that completes at a specified instant in time.
//! //!
//! * [`Interval`] A stream that yields at fixed time intervals. //! * [`Interval`] A stream that yields at fixed time intervals.
@ -27,7 +24,6 @@
//! //!
//! [`Delay`]: struct.Delay.html //! [`Delay`]: struct.Delay.html
//! [`Throttle`]: throttle/struct.Throttle.html //! [`Throttle`]: throttle/struct.Throttle.html
//! [`Debounce`]: debounce/struct.Debounce.html
//! [`Timeout`]: struct.Timeout.html //! [`Timeout`]: struct.Timeout.html
//! [`Interval`]: struct.Interval.html //! [`Interval`]: struct.Interval.html
//! [`Timer`]: timer/struct.Timer.html //! [`Timer`]: timer/struct.Timer.html
@ -40,7 +36,6 @@ extern crate futures;
extern crate slab; extern crate slab;
pub mod clock; pub mod clock;
pub mod debounce;
pub mod delay_queue; pub mod delay_queue;
pub mod throttle; pub mod throttle;
pub mod timeout; pub mod timeout;

View File

@ -1,175 +0,0 @@
extern crate futures;
extern crate tokio;
extern crate tokio_executor;
extern crate tokio_timer;
#[macro_use]
mod support;
use support::*;
use futures::{
prelude::*,
sync::mpsc,
};
use tokio::util::StreamExt;
use tokio_timer::{
debounce::{Debounce, Edge},
Timer,
};
#[test]
fn debounce_leading() {
mocked(|timer, _| {
let (debounced, tx) = make_debounced(Edge::Leading, None);
let items = smoke_tests(timer, tx, debounced);
assert_eq!(items.len(), 1);
assert_eq!(items[0], 0);
});
}
#[test]
fn debounce_trailing_many() {
mocked(|timer, _| {
let (mut debounced, tx) = make_debounced(Edge::Trailing, None);
// Send in two items.
tx.unbounded_send(1).unwrap();
tx.unbounded_send(2).unwrap();
// We shouldn't be ready yet, but we should have stored 2 as our last item.
assert_not_ready!(debounced);
// Go past our delay instant.
advance(timer, ms(11));
// Poll again, we should get 2.
assert_ready_eq!(debounced, Some(2));
// No more items in the stream, delay finished: we should be NotReady.
assert_not_ready!(debounced);
});
}
#[test]
fn debounce_trailing() {
mocked(|timer, _| {
let (debounced, tx) = make_debounced(Edge::Trailing, None);
let items = smoke_tests(timer, tx, debounced);
assert_eq!(items.len(), 1);
assert_eq!(items[0], 4);
});
}
#[test]
fn debounce_both() {
mocked(|timer, _| {
let (debounced, tx) = make_debounced(Edge::Both, None);
let items = smoke_tests(timer, tx, debounced);
assert_eq!(items.len(), 2);
assert_eq!(items[0], 0);
assert_eq!(items[1], 4);
});
}
#[test]
fn sample_leading() {
mocked(|timer, _| {
let (debounced, tx) = make_debounced(Edge::Leading, Some(3));
let items = smoke_tests(timer, tx, debounced);
assert_eq!(items.len(), 2);
assert_eq!(items[0], 0);
assert_eq!(items[1], 3);
});
}
#[test]
fn sample_trailing() {
mocked(|timer, _| {
let (debounced, tx) = make_debounced(Edge::Trailing, Some(3));
let items = smoke_tests(timer, tx, debounced);
assert_eq!(items.len(), 2);
assert_eq!(items[0], 2);
assert_eq!(items[1], 4);
});
}
#[test]
#[should_panic]
fn sample_both_panics() {
let (_, rx) = mpsc::unbounded::<()>();
let _ = rx.debounce_builder()
.max_wait(ms(10))
.edge(Edge::Both)
.build();
}
#[test]
fn combinator_debounce() {
let (_, rx1) = mpsc::unbounded::<()>();
let _ = rx1.debounce(ms(100));
}
#[test]
fn combinator_sample() {
let (_, rx1) = mpsc::unbounded::<()>();
let _ = rx1.sample(ms(100));
}
fn make_debounced(
edge: Edge,
max_wait: Option<u64>,
) -> (impl Stream<Item=u32, Error=()>, mpsc::UnboundedSender<u32>) {
let (tx, rx) = mpsc::unbounded();
let debounced = Debounce::new(
rx,
ms(10),
edge,
max_wait.map(ms),
)
.map_err(|e| panic!("stream error: {:?}", e));
(debounced, tx)
}
fn smoke_tests(
timer: &mut Timer<MockPark>,
tx: mpsc::UnboundedSender<u32>,
mut s: impl Stream<Item=u32, Error=()>,
) -> Vec<u32> {
assert_not_ready!(s);
let mut result = Vec::new();
// Drive forward 1ms at a time adding items to the stream
for i in 0..5 {
tx.unbounded_send(i).unwrap();
advance(timer, ms(1));
match s.poll().unwrap() {
Async::Ready(Some(it)) => result.push(it),
Async::Ready(None) => break,
Async::NotReady => {},
}
}
// Pull final items out of stream
for _ in 0..100 {
match s.poll().unwrap() {
Async::Ready(Some(it)) => result.push(it),
Async::Ready(None) => break,
Async::NotReady => {},
}
advance(timer, ms(1));
}
advance(timer, ms(1000));
assert_not_ready!(s);
result
}