diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b71d9d0c..2c516e31 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -243,13 +243,13 @@ jobs: # rustup component add clippy # - run: cargo clippy --workspace --all-features --all-targets - rustfmt: + fmt: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Install Rust run: rustup update stable && rustup default stable - - run: cargo fmt --all -- --check + - run: tools/fmt.sh docs: name: cargo doc diff --git a/.rustfmt.toml b/.rustfmt.toml index 2a35f023..2a79d927 100644 --- a/.rustfmt.toml +++ b/.rustfmt.toml @@ -1 +1,2 @@ use_small_heuristics = "Max" +edition = "2018" diff --git a/futures-channel/src/lock.rs b/futures-channel/src/lock.rs index 5eecdd9a..b328d0f7 100644 --- a/futures-channel/src/lock.rs +++ b/futures-channel/src/lock.rs @@ -6,8 +6,8 @@ use core::cell::UnsafeCell; use core::ops::{Deref, DerefMut}; -use core::sync::atomic::Ordering::SeqCst; use core::sync::atomic::AtomicBool; +use core::sync::atomic::Ordering::SeqCst; /// A "mutex" around a value, similar to `std::sync::Mutex`. /// @@ -37,10 +37,7 @@ unsafe impl Sync for Lock {} impl Lock { /// Creates a new lock around the given value. pub(crate) fn new(t: T) -> Self { - Self { - locked: AtomicBool::new(false), - data: UnsafeCell::new(t), - } + Self { locked: AtomicBool::new(false), data: UnsafeCell::new(t) } } /// Attempts to acquire this lock, returning whether the lock was acquired or diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs index c32ad4b6..28612da8 100644 --- a/futures-channel/src/mpsc/mod.rs +++ b/futures-channel/src/mpsc/mod.rs @@ -79,13 +79,13 @@ // by the queue structure. use futures_core::stream::{FusedStream, Stream}; -use futures_core::task::{Context, Poll, Waker}; use futures_core::task::__internal::AtomicWaker; +use futures_core::task::{Context, Poll, Waker}; use std::fmt; use std::pin::Pin; -use std::sync::{Arc, Mutex}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Mutex}; use std::thread; use crate::mpsc::queue::Queue; @@ -209,9 +209,7 @@ impl SendError { impl fmt::Debug for TrySendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TrySendError") - .field("kind", &self.err.kind) - .finish() + f.debug_struct("TrySendError").field("kind", &self.err.kind).finish() } } @@ -251,8 +249,7 @@ impl TrySendError { impl fmt::Debug for TryRecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("TryRecvError") - .finish() + f.debug_tuple("TryRecvError").finish() } } @@ -335,10 +332,7 @@ struct SenderTask { impl SenderTask { fn new() -> Self { - Self { - task: None, - is_parked: false, - } + Self { task: None, is_parked: false } } fn notify(&mut self) { @@ -381,9 +375,7 @@ pub fn channel(buffer: usize) -> (Sender, Receiver) { maybe_parked: false, }; - let rx = Receiver { - inner: Some(inner), - }; + let rx = Receiver { inner: Some(inner) }; (Sender(Some(tx)), rx) } @@ -399,7 +391,6 @@ pub fn channel(buffer: usize) -> (Sender, Receiver) { /// the channel. Using an `unbounded` channel has the ability of causing the /// process to run out of memory. In this case, the process will be aborted. pub fn unbounded() -> (UnboundedSender, UnboundedReceiver) { - let inner = Arc::new(UnboundedInner { state: AtomicUsize::new(INIT_STATE), message_queue: Queue::new(), @@ -407,13 +398,9 @@ pub fn unbounded() -> (UnboundedSender, UnboundedReceiver) { recv_task: AtomicWaker::new(), }); - let tx = UnboundedSenderInner { - inner: inner.clone(), - }; + let tx = UnboundedSenderInner { inner: inner.clone() }; - let rx = UnboundedReceiver { - inner: Some(inner), - }; + let rx = UnboundedReceiver { inner: Some(inner) }; (UnboundedSender(Some(tx)), rx) } @@ -430,13 +417,10 @@ impl UnboundedSenderInner { if state.is_open { Poll::Ready(Ok(())) } else { - Poll::Ready(Err(SendError { - kind: SendErrorKind::Disconnected, - })) + Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })) } } - // Push message to the queue and signal to the receiver fn queue_push_and_signal(&self, msg: T) { // Push the message onto the message queue @@ -462,16 +446,17 @@ impl UnboundedSenderInner { // This probably is never hit? Odds are the process will run out of // memory first. It may be worth to return something else in this // case? - assert!(state.num_messages < MAX_CAPACITY, "buffer space \ - exhausted; sending this messages would overflow the state"); + assert!( + state.num_messages < MAX_CAPACITY, + "buffer space \ + exhausted; sending this messages would overflow the state" + ); state.num_messages += 1; let next = encode_state(&state); match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { - Ok(_) => { - return Some(state.num_messages) - } + Ok(_) => return Some(state.num_messages), Err(actual) => curr = actual, } } @@ -516,12 +501,7 @@ impl BoundedSenderInner { fn try_send(&mut self, msg: T) -> Result<(), TrySendError> { // If the sender is currently blocked, reject the message if !self.poll_unparked(None).is_ready() { - return Err(TrySendError { - err: SendError { - kind: SendErrorKind::Full, - }, - val: msg, - }); + return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg }); } // The channel has capacity to accept the message, so send it @@ -531,9 +511,7 @@ impl BoundedSenderInner { // Do the send without failing. // Can be called only by bounded sender. #[allow(clippy::debug_assert_with_mut_call)] - fn do_send_b(&mut self, msg: T) - -> Result<(), TrySendError> - { + fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError> { // Anyone callig do_send *should* make sure there is room first, // but assert here for tests as a sanity check. debug_assert!(self.poll_unparked(None).is_ready()); @@ -551,12 +529,12 @@ impl BoundedSenderInner { // the configured buffer size num_messages > self.inner.buffer } - None => return Err(TrySendError { - err: SendError { - kind: SendErrorKind::Disconnected, - }, - val: msg, - }), + None => { + return Err(TrySendError { + err: SendError { kind: SendErrorKind::Disconnected }, + val: msg, + }) + } }; // If the channel has reached capacity, then the sender task needs to @@ -600,16 +578,17 @@ impl BoundedSenderInner { // This probably is never hit? Odds are the process will run out of // memory first. It may be worth to return something else in this // case? - assert!(state.num_messages < MAX_CAPACITY, "buffer space \ - exhausted; sending this messages would overflow the state"); + assert!( + state.num_messages < MAX_CAPACITY, + "buffer space \ + exhausted; sending this messages would overflow the state" + ); state.num_messages += 1; let next = encode_state(&state); match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { - Ok(_) => { - return Some(state.num_messages) - } + Ok(_) => return Some(state.num_messages), Err(actual) => curr = actual, } } @@ -644,15 +623,10 @@ impl BoundedSenderInner { /// capacity, in which case the current task is queued to be notified once /// capacity is available; /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. - fn poll_ready( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { let state = decode_state(self.inner.state.load(SeqCst)); if !state.is_open { - return Poll::Ready(Err(SendError { - kind: SendErrorKind::Disconnected, - })); + return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })); } self.poll_unparked(Some(cx)).map(Ok) @@ -699,7 +673,7 @@ impl BoundedSenderInner { if !task.is_parked { self.maybe_parked = false; - return Poll::Ready(()) + return Poll::Ready(()); } // At this point, an unpark request is pending, so there will be an @@ -724,12 +698,7 @@ impl Sender { if let Some(inner) = &mut self.0 { inner.try_send(msg) } else { - Err(TrySendError { - err: SendError { - kind: SendErrorKind::Disconnected, - }, - val: msg, - }) + Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) } } @@ -739,8 +708,7 @@ impl Sender { /// [`poll_ready`](Sender::poll_ready) has reported that the channel is /// ready to receive a message. pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.try_send(msg) - .map_err(|e| e.err) + self.try_send(msg).map_err(|e| e.err) } /// Polls the channel to determine if there is guaranteed capacity to send @@ -755,13 +723,8 @@ impl Sender { /// capacity, in which case the current task is queued to be notified once /// capacity is available; /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. - pub fn poll_ready( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { - let inner = self.0.as_mut().ok_or(SendError { - kind: SendErrorKind::Disconnected, - })?; + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?; inner.poll_ready(cx) } @@ -799,7 +762,10 @@ impl Sender { } /// Hashes the receiver into the provided hasher - pub fn hash_receiver(&self, hasher: &mut H) where H: std::hash::Hasher { + pub fn hash_receiver(&self, hasher: &mut H) + where + H: std::hash::Hasher, + { use std::hash::Hash; let ptr = self.0.as_ref().map(|inner| inner.ptr()); @@ -809,13 +775,8 @@ impl Sender { impl UnboundedSender { /// Check if the channel is ready to receive a message. - pub fn poll_ready( - &self, - _: &mut Context<'_>, - ) -> Poll> { - let inner = self.0.as_ref().ok_or(SendError { - kind: SendErrorKind::Disconnected, - })?; + pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { + let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?; inner.poll_ready_nb() } @@ -845,12 +806,7 @@ impl UnboundedSender { } } - Err(TrySendError { - err: SendError { - kind: SendErrorKind::Disconnected, - }, - val: msg, - }) + Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) } /// Send a message on the channel. @@ -858,8 +814,7 @@ impl UnboundedSender { /// This method should only be called after `poll_ready` has been used to /// verify that the channel is ready to receive a message. pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.do_send_nb(msg) - .map_err(|e| e.err) + self.do_send_nb(msg).map_err(|e| e.err) } /// Sends a message along this channel. @@ -888,7 +843,10 @@ impl UnboundedSender { } /// Hashes the receiver into the provided hasher - pub fn hash_receiver(&self, hasher: &mut H) where H: std::hash::Hasher { + pub fn hash_receiver(&self, hasher: &mut H) + where + H: std::hash::Hasher, + { use std::hash::Hash; let ptr = self.0.as_ref().map(|inner| inner.ptr()); @@ -928,9 +886,7 @@ impl Clone for UnboundedSenderInner { Ok(_) => { // The ABA problem doesn't matter here. We only care that the // number of senders never exceeds the maximum. - return Self { - inner: self.inner.clone(), - }; + return Self { inner: self.inner.clone() }; } Err(actual) => curr = actual, } @@ -1027,9 +983,7 @@ impl Receiver { /// * `Err(e)` when there are no messages available, but channel is not yet closed pub fn try_next(&mut self) -> Result, TryRecvError> { match self.next_message() { - Poll::Ready(msg) => { - Ok(msg) - }, + Poll::Ready(msg) => Ok(msg), Poll::Pending => Err(TryRecvError { _priv: () }), } } @@ -1103,18 +1057,15 @@ impl FusedStream for Receiver { impl Stream for Receiver { type Item = T; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - // Try to read a message off of the message queue. + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Try to read a message off of the message queue. match self.next_message() { Poll::Ready(msg) => { if msg.is_none() { self.inner = None; } Poll::Ready(msg) - }, + } Poll::Pending => { // There are no messages to read, in this case, park. self.inner.as_ref().unwrap().recv_task.register(cx.waker()); @@ -1180,9 +1131,7 @@ impl UnboundedReceiver { /// * `Err(e)` when there are no messages available, but channel is not yet closed pub fn try_next(&mut self) -> Result, TryRecvError> { match self.next_message() { - Poll::Ready(msg) => { - Ok(msg) - }, + Poll::Ready(msg) => Ok(msg), Poll::Pending => Err(TryRecvError { _priv: () }), } } @@ -1240,10 +1189,7 @@ impl FusedStream for UnboundedReceiver { impl Stream for UnboundedReceiver { type Item = T; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Try to read a message off of the message queue. match self.next_message() { Poll::Ready(msg) => { @@ -1251,7 +1197,7 @@ impl Stream for UnboundedReceiver { self.inner = None; } Poll::Ready(msg) - }, + } Poll::Pending => { // There are no messages to read, in this case, park. self.inner.as_ref().unwrap().recv_task.register(cx.waker()); @@ -1349,10 +1295,7 @@ impl State { */ fn decode_state(num: usize) -> State { - State { - is_open: num & OPEN_MASK == OPEN_MASK, - num_messages: num & MAX_CAPACITY, - } + State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY } } fn encode_state(state: &State) -> usize { diff --git a/futures-channel/src/mpsc/queue.rs b/futures-channel/src/mpsc/queue.rs index b00e1b17..57dc7f56 100644 --- a/futures-channel/src/mpsc/queue.rs +++ b/futures-channel/src/mpsc/queue.rs @@ -43,10 +43,10 @@ pub(super) use self::PopResult::*; -use std::thread; use std::cell::UnsafeCell; use std::ptr; use std::sync::atomic::{AtomicPtr, Ordering}; +use std::thread; /// A result of the `pop` function. pub(super) enum PopResult { @@ -76,15 +76,12 @@ pub(super) struct Queue { tail: UnsafeCell<*mut Node>, } -unsafe impl Send for Queue { } -unsafe impl Sync for Queue { } +unsafe impl Send for Queue {} +unsafe impl Sync for Queue {} impl Node { unsafe fn new(v: Option) -> *mut Self { - Box::into_raw(Box::new(Self { - next: AtomicPtr::new(ptr::null_mut()), - value: v, - })) + Box::into_raw(Box::new(Self { next: AtomicPtr::new(ptr::null_mut()), value: v })) } } @@ -93,10 +90,7 @@ impl Queue { /// one consumer. pub(super) fn new() -> Self { let stub = unsafe { Node::new(None) }; - Self { - head: AtomicPtr::new(stub), - tail: UnsafeCell::new(stub), - } + Self { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) } } /// Pushes a new value onto this queue. @@ -133,7 +127,11 @@ impl Queue { return Data(ret); } - if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent} + if self.head.load(Ordering::Acquire) == tail { + Empty + } else { + Inconsistent + } } /// Pop an element similarly to `pop` function, but spin-wait on inconsistent diff --git a/futures-channel/src/mpsc/sink_impl.rs b/futures-channel/src/mpsc/sink_impl.rs index 4ce66b4e..1be20162 100644 --- a/futures-channel/src/mpsc/sink_impl.rs +++ b/futures-channel/src/mpsc/sink_impl.rs @@ -6,24 +6,15 @@ use std::pin::Pin; impl Sink for Sender { type Error = SendError; - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { (*self).poll_ready(cx) } - fn start_send( - mut self: Pin<&mut Self>, - msg: T, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { (*self).start_send(msg) } - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match (*self).poll_ready(cx) { Poll::Ready(Err(ref e)) if e.is_disconnected() => { // If the receiver disconnected, we consider the sink to be flushed. @@ -33,10 +24,7 @@ impl Sink for Sender { } } - fn poll_close( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { + fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { self.disconnect(); Poll::Ready(Ok(())) } @@ -45,31 +33,19 @@ impl Sink for Sender { impl Sink for UnboundedSender { type Error = SendError; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Self::poll_ready(&*self, cx) } - fn start_send( - mut self: Pin<&mut Self>, - msg: T, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { Self::start_send(&mut *self, msg) } - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn poll_close( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { + fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { self.disconnect(); Poll::Ready(Ok(())) } @@ -78,29 +54,19 @@ impl Sink for UnboundedSender { impl Sink for &UnboundedSender { type Error = SendError; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { UnboundedSender::poll_ready(*self, cx) } fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { - self.unbounded_send(msg) - .map_err(TrySendError::into_send_error) + self.unbounded_send(msg).map_err(TrySendError::into_send_error) } - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn poll_close( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { self.close_channel(); Poll::Ready(Ok(())) } diff --git a/futures-channel/src/oneshot.rs b/futures-channel/src/oneshot.rs index 1c393c5a..5af651b9 100644 --- a/futures-channel/src/oneshot.rs +++ b/futures-channel/src/oneshot.rs @@ -7,7 +7,7 @@ use core::fmt; use core::pin::Pin; use core::sync::atomic::AtomicBool; use core::sync::atomic::Ordering::SeqCst; -use futures_core::future::{Future, FusedFuture}; +use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll, Waker}; use crate::lock::Lock; @@ -103,12 +103,8 @@ struct Inner { /// ``` pub fn channel() -> (Sender, Receiver) { let inner = Arc::new(Inner::new()); - let receiver = Receiver { - inner: inner.clone(), - }; - let sender = Sender { - inner, - }; + let receiver = Receiver { inner: inner.clone() }; + let sender = Sender { inner }; (sender, receiver) } @@ -124,7 +120,7 @@ impl Inner { fn send(&self, t: T) -> Result<(), T> { if self.complete.load(SeqCst) { - return Err(t) + return Err(t); } // Note that this lock acquisition may fail if the receiver @@ -161,7 +157,7 @@ impl Inner { // destructor, but our destructor hasn't run yet so if it's set then the // oneshot is gone. if self.complete.load(SeqCst) { - return Poll::Ready(()) + return Poll::Ready(()); } // If our other half is not gone then we need to park our current task @@ -270,7 +266,10 @@ impl Inner { } else { let task = cx.waker().clone(); match self.rx_task.try_lock() { - Some(mut slot) => { *slot = Some(task); false }, + Some(mut slot) => { + *slot = Some(task); + false + } None => true, } }; @@ -456,10 +455,7 @@ impl Receiver { impl Future for Receiver { type Output = Result; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.inner.recv(cx) } } diff --git a/futures-task/src/waker.rs b/futures-task/src/waker.rs index 265a445d..a7310a07 100644 --- a/futures-task/src/waker.rs +++ b/futures-task/src/waker.rs @@ -1,7 +1,7 @@ use super::arc_wake::ArcWake; -use core::mem; -use core::task::{Waker, RawWaker, RawWakerVTable}; use alloc::sync::Arc; +use core::mem; +use core::task::{RawWaker, RawWakerVTable, Waker}; pub(super) fn waker_vtable() -> &'static RawWakerVTable { &RawWakerVTable::new( @@ -22,9 +22,7 @@ where { let ptr = Arc::into_raw(wake) as *const (); - unsafe { - Waker::from_raw(RawWaker::new(ptr, waker_vtable::())) - } + unsafe { Waker::from_raw(RawWaker::new(ptr, waker_vtable::())) } } // FIXME: panics on Arc::clone / refcount changes could wreak havoc on the diff --git a/futures-task/src/waker_ref.rs b/futures-task/src/waker_ref.rs index 76d849ac..791c6901 100644 --- a/futures-task/src/waker_ref.rs +++ b/futures-task/src/waker_ref.rs @@ -1,10 +1,10 @@ -use super::arc_wake::{ArcWake}; +use super::arc_wake::ArcWake; use super::waker::waker_vtable; use alloc::sync::Arc; -use core::mem::ManuallyDrop; use core::marker::PhantomData; +use core::mem::ManuallyDrop; use core::ops::Deref; -use core::task::{Waker, RawWaker}; +use core::task::{RawWaker, Waker}; /// A [`Waker`] that is only valid for a given lifetime. /// @@ -22,10 +22,7 @@ impl<'a> WakerRef<'a> { // copy the underlying (raw) waker without calling a clone, // as we won't call Waker::drop either. let waker = ManuallyDrop::new(unsafe { core::ptr::read(waker) }); - Self { - waker, - _marker: PhantomData, - } + Self { waker, _marker: PhantomData } } /// Create a new [`WakerRef`] from a [`Waker`] that must not be dropped. @@ -35,10 +32,7 @@ impl<'a> WakerRef<'a> { /// by the caller), and the [`Waker`] doesn't need to or must not be /// destroyed. pub fn new_unowned(waker: ManuallyDrop) -> Self { - Self { - waker, - _marker: PhantomData, - } + Self { waker, _marker: PhantomData } } } @@ -57,14 +51,13 @@ impl Deref for WakerRef<'_> { #[inline] pub fn waker_ref(wake: &Arc) -> WakerRef<'_> where - W: ArcWake + W: ArcWake, { // simply copy the pointer instead of using Arc::into_raw, // as we don't actually keep a refcount by using ManuallyDrop.< let ptr = (&**wake as *const W) as *const (); - let waker = ManuallyDrop::new(unsafe { - Waker::from_raw(RawWaker::new(ptr, waker_vtable::())) - }); + let waker = + ManuallyDrop::new(unsafe { Waker::from_raw(RawWaker::new(ptr, waker_vtable::())) }); WakerRef::new_unowned(waker) } diff --git a/futures-util/benches_disabled/bilock.rs b/futures-util/benches_disabled/bilock.rs index 48afe3c5..417f75d3 100644 --- a/futures-util/benches_disabled/bilock.rs +++ b/futures-util/benches_disabled/bilock.rs @@ -2,125 +2,121 @@ #[cfg(feature = "bilock")] mod bench { -use futures::task::{Context, Waker}; -use futures::executor::LocalPool; -use futures_util::lock::BiLock; -use futures_util::lock::BiLockAcquire; -use futures_util::lock::BiLockAcquired; -use futures_util::task::ArcWake; + use futures::executor::LocalPool; + use futures::task::{Context, Waker}; + use futures_util::lock::BiLock; + use futures_util::lock::BiLockAcquire; + use futures_util::lock::BiLockAcquired; + use futures_util::task::ArcWake; -use std::sync::Arc; -use test::Bencher; + use std::sync::Arc; + use test::Bencher; -fn notify_noop() -> Waker { - struct Noop; + fn notify_noop() -> Waker { + struct Noop; - impl ArcWake for Noop { - fn wake(_: &Arc) {} + impl ArcWake for Noop { + fn wake(_: &Arc) {} + } + + ArcWake::into_waker(Arc::new(Noop)) } - ArcWake::into_waker(Arc::new(Noop)) -} + /// Pseudo-stream which simply calls `lock.poll()` on `poll` + struct LockStream { + lock: BiLockAcquire, + } + impl LockStream { + fn new(lock: BiLock) -> Self { + Self { lock: lock.lock() } + } -/// Pseudo-stream which simply calls `lock.poll()` on `poll` -struct LockStream { - lock: BiLockAcquire, -} - -impl LockStream { - fn new(lock: BiLock) -> Self { - Self { - lock: lock.lock() + /// Release a lock after it was acquired in `poll`, + /// so `poll` could be called again. + fn release_lock(&mut self, guard: BiLockAcquired) { + self.lock = guard.unlock().lock() } } - /// Release a lock after it was acquired in `poll`, - /// so `poll` could be called again. - fn release_lock(&mut self, guard: BiLockAcquired) { - self.lock = guard.unlock().lock() + impl Stream for LockStream { + type Item = BiLockAcquired; + type Error = (); + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll, Self::Error> { + self.lock.poll(cx).map(|a| a.map(Some)) + } + } + + #[bench] + fn contended(b: &mut Bencher) { + let pool = LocalPool::new(); + let mut exec = pool.executor(); + let waker = notify_noop(); + let mut map = task::LocalMap::new(); + let mut waker = task::Context::new(&mut map, &waker, &mut exec); + + b.iter(|| { + let (x, y) = BiLock::new(1); + + let mut x = LockStream::new(x); + let mut y = LockStream::new(y); + + for _ in 0..1000 { + let x_guard = match x.poll_next(&mut waker) { + Ok(Poll::Ready(Some(guard))) => guard, + _ => panic!(), + }; + + // Try poll second lock while first lock still holds the lock + match y.poll_next(&mut waker) { + Ok(Poll::Pending) => (), + _ => panic!(), + }; + + x.release_lock(x_guard); + + let y_guard = match y.poll_next(&mut waker) { + Ok(Poll::Ready(Some(guard))) => guard, + _ => panic!(), + }; + + y.release_lock(y_guard); + } + (x, y) + }); + } + + #[bench] + fn lock_unlock(b: &mut Bencher) { + let pool = LocalPool::new(); + let mut exec = pool.executor(); + let waker = notify_noop(); + let mut map = task::LocalMap::new(); + let mut waker = task::Context::new(&mut map, &waker, &mut exec); + + b.iter(|| { + let (x, y) = BiLock::new(1); + + let mut x = LockStream::new(x); + let mut y = LockStream::new(y); + + for _ in 0..1000 { + let x_guard = match x.poll_next(&mut waker) { + Ok(Poll::Ready(Some(guard))) => guard, + _ => panic!(), + }; + + x.release_lock(x_guard); + + let y_guard = match y.poll_next(&mut waker) { + Ok(Poll::Ready(Some(guard))) => guard, + _ => panic!(), + }; + + y.release_lock(y_guard); + } + (x, y) + }) } } - -impl Stream for LockStream { - type Item = BiLockAcquired; - type Error = (); - - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll, Self::Error> { - self.lock.poll(cx).map(|a| a.map(Some)) - } -} - - -#[bench] -fn contended(b: &mut Bencher) { - let pool = LocalPool::new(); - let mut exec = pool.executor(); - let waker = notify_noop(); - let mut map = task::LocalMap::new(); - let mut waker = task::Context::new(&mut map, &waker, &mut exec); - - b.iter(|| { - let (x, y) = BiLock::new(1); - - let mut x = LockStream::new(x); - let mut y = LockStream::new(y); - - for _ in 0..1000 { - let x_guard = match x.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - // Try poll second lock while first lock still holds the lock - match y.poll_next(&mut waker) { - Ok(Poll::Pending) => (), - _ => panic!(), - }; - - x.release_lock(x_guard); - - let y_guard = match y.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - y.release_lock(y_guard); - } - (x, y) - }); -} - -#[bench] -fn lock_unlock(b: &mut Bencher) { - let pool = LocalPool::new(); - let mut exec = pool.executor(); - let waker = notify_noop(); - let mut map = task::LocalMap::new(); - let mut waker = task::Context::new(&mut map, &waker, &mut exec); - - b.iter(|| { - let (x, y) = BiLock::new(1); - - let mut x = LockStream::new(x); - let mut y = LockStream::new(y); - - for _ in 0..1000 { - let x_guard = match x.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - x.release_lock(x_guard); - - let y_guard = match y.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - y.release_lock(y_guard); - } - (x, y) - }) -} -} diff --git a/futures-util/src/future/abortable.rs b/futures-util/src/future/abortable.rs index 3f2e5a06..198cc8e6 100644 --- a/futures-util/src/future/abortable.rs +++ b/futures-util/src/future/abortable.rs @@ -1,11 +1,11 @@ use super::assert_future; use crate::task::AtomicWaker; -use futures_core::future::Future; -use futures_core::task::{Context, Poll}; +use alloc::sync::Arc; use core::fmt; use core::pin::Pin; use core::sync::atomic::{AtomicBool, Ordering}; -use alloc::sync::Arc; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; pin_project! { @@ -19,7 +19,10 @@ pin_project! { } } -impl Abortable where Fut: Future { +impl Abortable +where + Fut: Future, +{ /// Creates a new `Abortable` future using an existing `AbortRegistration`. /// `AbortRegistration`s can be acquired through `AbortHandle::new`. /// @@ -40,10 +43,7 @@ impl Abortable where Fut: Future { /// # }); /// ``` pub fn new(future: Fut, reg: AbortRegistration) -> Self { - assert_future::, _>(Self { - future, - inner: reg.inner, - }) + assert_future::, _>(Self { future, inner: reg.inner }) } } @@ -80,19 +80,10 @@ impl AbortHandle { /// # }); /// ``` pub fn new_pair() -> (Self, AbortRegistration) { - let inner = Arc::new(AbortInner { - waker: AtomicWaker::new(), - cancel: AtomicBool::new(false), - }); + let inner = + Arc::new(AbortInner { waker: AtomicWaker::new(), cancel: AtomicBool::new(false) }); - ( - Self { - inner: inner.clone(), - }, - AbortRegistration { - inner, - }, - ) + (Self { inner: inner.clone() }, AbortRegistration { inner }) } } @@ -112,13 +103,11 @@ struct AbortInner { /// This function is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. pub fn abortable(future: Fut) -> (Abortable, AbortHandle) - where Fut: Future +where + Fut: Future, { let (handle, reg) = AbortHandle::new_pair(); - ( - Abortable::new(future, reg), - handle, - ) + (Abortable::new(future, reg), handle) } /// Indicator that the `Abortable` future was aborted. @@ -134,18 +123,21 @@ impl fmt::Display for Aborted { #[cfg(feature = "std")] impl std::error::Error for Aborted {} -impl Future for Abortable where Fut: Future { +impl Future for Abortable +where + Fut: Future, +{ type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // Check if the future has been aborted if self.inner.cancel.load(Ordering::Relaxed) { - return Poll::Ready(Err(Aborted)) + return Poll::Ready(Err(Aborted)); } // attempt to complete the future if let Poll::Ready(x) = self.as_mut().project().future.poll(cx) { - return Poll::Ready(Ok(x)) + return Poll::Ready(Ok(x)); } // Register to receive a wakeup if the future is aborted in the... future @@ -156,7 +148,7 @@ impl Future for Abortable where Fut: Future { // Checking with `Relaxed` is sufficient because `register` introduces an // `AcqRel` barrier. if self.inner.cancel.load(Ordering::Relaxed) { - return Poll::Ready(Err(Aborted)) + return Poll::Ready(Err(Aborted)); } Poll::Pending diff --git a/futures-util/src/lock/bilock.rs b/futures-util/src/lock/bilock.rs index 600e16e4..2f51ae7c 100644 --- a/futures-util/src/lock/bilock.rs +++ b/futures-util/src/lock/bilock.rs @@ -1,16 +1,16 @@ //! Futures-powered synchronization primitives. -#[cfg(feature = "bilock")] -use futures_core::future::Future; -use futures_core::task::{Context, Poll, Waker}; +use alloc::boxed::Box; +use alloc::sync::Arc; use core::cell::UnsafeCell; use core::fmt; use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::sync::atomic::AtomicUsize; use core::sync::atomic::Ordering::SeqCst; -use alloc::boxed::Box; -use alloc::sync::Arc; +#[cfg(feature = "bilock")] +use futures_core::future::Future; +use futures_core::task::{Context, Poll, Waker}; /// A type of futures-powered synchronization primitive which is a mutex between /// two possible owners. @@ -61,10 +61,7 @@ impl BiLock { /// Similarly, reuniting the lock and extracting the inner value is only /// possible when `T` is `Unpin`. pub fn new(t: T) -> (Self, Self) { - let arc = Arc::new(Inner { - state: AtomicUsize::new(0), - value: Some(UnsafeCell::new(t)), - }); + let arc = Arc::new(Inner { state: AtomicUsize::new(0), value: Some(UnsafeCell::new(t)) }); (Self { arc: arc.clone() }, Self { arc }) } @@ -103,11 +100,11 @@ impl BiLock { let mut prev = Box::from_raw(n as *mut Waker); *prev = cx.waker().clone(); waker = Some(prev); - } + }, } // type ascription for safety's sake! - let me: Box = waker.take().unwrap_or_else(||Box::new(cx.waker().clone())); + let me: Box = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone())); let me = Box::into_raw(me) as usize; match self.arc.state.compare_exchange(1, me, SeqCst, SeqCst) { @@ -145,9 +142,7 @@ impl BiLock { #[cfg(feature = "bilock")] #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] pub fn lock(&self) -> BiLockAcquire<'_, T> { - BiLockAcquire { - bilock: self, - } + BiLockAcquire { bilock: self } } /// Attempts to put the two "halves" of a `BiLock` back together and @@ -181,7 +176,7 @@ impl BiLock { // up as its now their turn. n => unsafe { Box::from_raw(n as *mut Waker).wake(); - } + }, } } } @@ -205,9 +200,7 @@ pub struct ReuniteError(pub BiLock, pub BiLock); impl fmt::Debug for ReuniteError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("ReuniteError") - .field(&"...") - .finish() + f.debug_tuple("ReuniteError").field(&"...").finish() } } diff --git a/futures-util/src/lock/mutex.rs b/futures-util/src/lock/mutex.rs index a78de628..a849aeeb 100644 --- a/futures-util/src/lock/mutex.rs +++ b/futures-util/src/lock/mutex.rs @@ -1,13 +1,13 @@ use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll, Waker}; use slab::Slab; -use std::{fmt, mem}; use std::cell::UnsafeCell; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use std::pin::Pin; -use std::sync::Mutex as StdMutex; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Mutex as StdMutex; +use std::{fmt, mem}; /// A futures-aware mutex. /// @@ -53,7 +53,7 @@ enum Waiter { impl Waiter { fn register(&mut self, waker: &Waker) { match self { - Self::Waiting(w) if waker.will_wake(w) => {}, + Self::Waiting(w) if waker.will_wake(w) => {} _ => *self = Self::Waiting(waker.clone()), } } @@ -61,7 +61,7 @@ impl Waiter { fn wake(&mut self) { match mem::replace(self, Self::Woken) { Self::Waiting(waker) => waker.wake(), - Self::Woken => {}, + Self::Woken => {} } } } @@ -113,10 +113,7 @@ impl Mutex { /// This method returns a future that will resolve once the lock has been /// successfully acquired. pub fn lock(&self) -> MutexLockFuture<'_, T> { - MutexLockFuture { - mutex: Some(self), - wait_key: WAIT_KEY_NONE, - } + MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE } } /// Returns a mutable reference to the underlying data. @@ -145,7 +142,7 @@ impl Mutex { if wait_key != WAIT_KEY_NONE { let mut waiters = self.waiters.lock().unwrap(); match waiters.remove(wait_key) { - Waiter::Waiting(_) => {}, + Waiter::Waiting(_) => {} Waiter::Woken => { // We were awoken, but then dropped before we could // wake up to acquire the lock. Wake up another @@ -191,13 +188,10 @@ impl fmt::Debug for MutexLockFuture<'_, T> { f.debug_struct("MutexLockFuture") .field("was_acquired", &self.mutex.is_none()) .field("mutex", &self.mutex) - .field("wait_key", &( - if self.wait_key == WAIT_KEY_NONE { - None - } else { - Some(self.wait_key) - } - )) + .field( + "wait_key", + &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }), + ) .finish() } } @@ -295,10 +289,7 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> { impl fmt::Debug for MutexGuard<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MutexGuard") - .field("value", &&**self) - .field("mutex", &self.mutex) - .finish() + f.debug_struct("MutexGuard").field("value", &&**self).field("mutex", &self.mutex).finish() } } diff --git a/futures-util/src/stream/futures_ordered.rs b/futures-util/src/stream/futures_ordered.rs index eda3b270..f596b3b0 100644 --- a/futures-util/src/stream/futures_ordered.rs +++ b/futures-util/src/stream/futures_ordered.rs @@ -52,10 +52,7 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let index = self.index; - self.project().data.poll(cx).map(|output| OrderWrapper { - data: output, - index, - }) + self.project().data.poll(cx).map(|output| OrderWrapper { data: output, index }) } } @@ -139,10 +136,7 @@ impl FuturesOrdered { /// must ensure that `FuturesOrdered::poll` is called in order to receive /// task notifications. pub fn push(&mut self, future: Fut) { - let wrapped = OrderWrapper { - data: future, - index: self.next_incoming_index, - }; + let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; self.next_incoming_index += 1; self.in_progress_queue.push(wrapped); } diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 8dcc551e..d1377ff3 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -3,11 +3,8 @@ //! This module is only available when the `std` or `alloc` feature of this //! library is activated, and it is activated by default. -use futures_core::future::Future; -use futures_core::stream::{FusedStream, Stream}; -use futures_core::task::{Context, Poll}; -use futures_task::{FutureObj, LocalFutureObj, Spawn, LocalSpawn, SpawnError}; use crate::task::AtomicWaker; +use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; use core::fmt::{self, Debug}; use core::iter::FromIterator; @@ -16,8 +13,11 @@ use core::mem; use core::pin::Pin; use core::ptr; use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst}; -use core::sync::atomic::{AtomicPtr, AtomicBool}; -use alloc::sync::{Arc, Weak}; +use core::sync::atomic::{AtomicBool, AtomicPtr}; +use futures_core::future::Future; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; mod abort; @@ -28,8 +28,7 @@ mod task; use self::task::Task; mod ready_to_run_queue; -use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue}; - +use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue}; /// A set of futures which may complete in any order. /// @@ -63,18 +62,14 @@ unsafe impl Sync for FuturesUnordered {} impl Unpin for FuturesUnordered {} impl Spawn for FuturesUnordered> { - fn spawn_obj(&self, future_obj: FutureObj<'static, ()>) - -> Result<(), SpawnError> - { + fn spawn_obj(&self, future_obj: FutureObj<'static, ()>) -> Result<(), SpawnError> { self.push(future_obj); Ok(()) } } impl LocalSpawn for FuturesUnordered> { - fn spawn_local_obj(&self, future_obj: LocalFutureObj<'static, ()>) - -> Result<(), SpawnError> - { + fn spawn_local_obj(&self, future_obj: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { self.push(future_obj); Ok(()) } @@ -191,7 +186,10 @@ impl FuturesUnordered { } /// Returns an iterator that allows inspecting each future in the set. - pub fn iter(&self) -> Iter<'_, Fut> where Fut: Unpin { + pub fn iter(&self) -> Iter<'_, Fut> + where + Fut: Unpin, + { Iter(Pin::new(self).iter_pin_ref()) } @@ -199,16 +197,14 @@ impl FuturesUnordered { fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> { let (task, len) = self.atomic_load_head_and_len_all(); - IterPinRef { - task, - len, - pending_next_all: self.pending_next_all(), - _marker: PhantomData, - } + IterPinRef { task, len, pending_next_all: self.pending_next_all(), _marker: PhantomData } } /// Returns an iterator that allows modifying each future in the set. - pub fn iter_mut(&mut self) -> IterMut<'_, Fut> where Fut: Unpin { + pub fn iter_mut(&mut self) -> IterMut<'_, Fut> + where + Fut: Unpin, + { IterMut(Pin::new(self).iter_pin_mut()) } @@ -217,19 +213,9 @@ impl FuturesUnordered { // `head_all` can be accessed directly and we don't need to spin on // `Task::next_all` since we have exclusive access to the set. let task = *self.head_all.get_mut(); - let len = if task.is_null() { - 0 - } else { - unsafe { - *(*task).len_all.get() - } - }; + let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } }; - IterPinMut { - task, - len, - _marker: PhantomData - } + IterPinMut { task, len, _marker: PhantomData } } /// Returns the current head node and number of futures in the list of all @@ -395,9 +381,7 @@ impl FuturesUnordered { impl Stream for FuturesUnordered { type Item = Fut::Output; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll> - { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Variable to determine how many times it is allowed to poll underlying // futures without yielding. // @@ -469,14 +453,11 @@ impl Stream for FuturesUnordered { // Double check that the call to `release_task` really // happened. Calling it required the task to be unlinked. - debug_assert_eq!( - task.next_all.load(Relaxed), - self.pending_next_all() - ); + debug_assert_eq!(task.next_all.load(Relaxed), self.pending_next_all()); unsafe { debug_assert!((*task.prev_all.get()).is_null()); } - continue + continue; } }; @@ -516,10 +497,7 @@ impl Stream for FuturesUnordered { } } - let mut bomb = Bomb { - task: Some(task), - queue: &mut *self, - }; + let mut bomb = Bomb { task: Some(task), queue: &mut *self }; // Poll the underlying future with the appropriate waker // implementation. This is where a large bit of the unsafety @@ -555,11 +533,9 @@ impl Stream for FuturesUnordered { cx.waker().wake_by_ref(); return Poll::Pending; } - continue - } - Poll::Ready(output) => { - return Poll::Ready(Some(output)) + continue; } + Poll::Ready(output) => return Poll::Ready(Some(output)), } } } @@ -611,7 +587,10 @@ impl FromIterator for FuturesUnordered { I: IntoIterator, { let acc = Self::new(); - iter.into_iter().fold(acc, |acc, item| { acc.push(item); acc }) + iter.into_iter().fold(acc, |acc, item| { + acc.push(item); + acc + }) } } diff --git a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs index 21051958..3b34dc6e 100644 --- a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs @@ -1,9 +1,9 @@ use crate::task::AtomicWaker; +use alloc::sync::Arc; use core::cell::UnsafeCell; use core::ptr; use core::sync::atomic::AtomicPtr; -use core::sync::atomic::Ordering::{Relaxed, Acquire, Release, AcqRel}; -use alloc::sync::Arc; +use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; use super::abort::abort; use super::task::Task; diff --git a/futures-util/src/stream/futures_unordered/task.rs b/futures-util/src/stream/futures_unordered/task.rs index 261408f0..da2cd67d 100644 --- a/futures-util/src/stream/futures_unordered/task.rs +++ b/futures-util/src/stream/futures_unordered/task.rs @@ -1,11 +1,11 @@ -use core::cell::UnsafeCell; -use core::sync::atomic::{AtomicPtr, AtomicBool}; -use core::sync::atomic::Ordering::{self, SeqCst}; use alloc::sync::{Arc, Weak}; +use core::cell::UnsafeCell; +use core::sync::atomic::Ordering::{self, SeqCst}; +use core::sync::atomic::{AtomicBool, AtomicPtr}; -use crate::task::{ArcWake, WakerRef, waker_ref}; -use super::ReadyToRunQueue; use super::abort::abort; +use super::ReadyToRunQueue; +use crate::task::{waker_ref, ArcWake, WakerRef}; pub(super) struct Task { // The future diff --git a/futures-util/src/stream/select_all.rs b/futures-util/src/stream/select_all.rs index c0b92faa..6b17bad1 100644 --- a/futures-util/src/stream/select_all.rs +++ b/futures-util/src/stream/select_all.rs @@ -5,11 +5,11 @@ use core::iter::FromIterator; use core::pin::Pin; use futures_core::ready; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use super::assert_stream; -use crate::stream::{StreamExt, StreamFuture, FuturesUnordered}; +use crate::stream::{FuturesUnordered, StreamExt, StreamFuture}; /// An unbounded set of streams /// @@ -75,10 +75,7 @@ impl Default for SelectAll { impl Stream for SelectAll { type Item = St::Item; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { match ready!(self.inner.poll_next_unpin(cx)) { Some((Some(item), remaining)) => { @@ -116,8 +113,9 @@ impl FusedStream for SelectAll { /// This function is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. pub fn select_all(streams: I) -> SelectAll - where I: IntoIterator, - I::Item: Stream + Unpin +where + I: IntoIterator, + I::Item: Stream + Unpin, { let mut set = SelectAll::new(); diff --git a/futures-util/src/stream/stream/buffer_unordered.rs b/futures-util/src/stream/stream/buffer_unordered.rs index de42cfd3..d64c142b 100644 --- a/futures-util/src/stream/stream/buffer_unordered.rs +++ b/futures-util/src/stream/stream/buffer_unordered.rs @@ -1,12 +1,12 @@ use crate::stream::{Fuse, FuturesUnordered, StreamExt}; +use core::fmt; +use core::pin::Pin; use futures_core::future::Future; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; -use core::fmt; -use core::pin::Pin; pin_project! { /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered) @@ -63,10 +63,7 @@ where { type Item = ::Output; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); // First up, try to spawn off as many futures as possible by filling up diff --git a/futures-util/src/stream/stream/buffered.rs b/futures-util/src/stream/stream/buffered.rs index 1af9f492..6052a737 100644 --- a/futures-util/src/stream/stream/buffered.rs +++ b/futures-util/src/stream/stream/buffered.rs @@ -1,4 +1,6 @@ use crate::stream::{Fuse, FuturesOrdered, StreamExt}; +use core::fmt; +use core::pin::Pin; use futures_core::future::Future; use futures_core::ready; use futures_core::stream::Stream; @@ -6,8 +8,6 @@ use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; -use core::fmt; -use core::pin::Pin; pin_project! { /// Stream for the [`buffered`](super::StreamExt::buffered) method. @@ -44,11 +44,7 @@ where St::Item: Future, { pub(super) fn new(stream: St, n: usize) -> Self { - Self { - stream: super::Fuse::new(stream), - in_progress_queue: FuturesOrdered::new(), - max: n, - } + Self { stream: super::Fuse::new(stream), in_progress_queue: FuturesOrdered::new(), max: n } } delegate_access_inner!(stream, St, (.)); @@ -61,10 +57,7 @@ where { type Item = ::Output; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); // First up, try to spawn off as many futures as possible by filling up @@ -79,7 +72,7 @@ where // Attempt to pull the next value from the in_progress_queue let res = this.in_progress_queue.poll_next_unpin(cx); if let Some(val) = ready!(res) { - return Poll::Ready(Some(val)) + return Poll::Ready(Some(val)); } // If more values are still coming from the stream, we're not done yet diff --git a/futures-util/src/stream/stream/for_each_concurrent.rs b/futures-util/src/stream/stream/for_each_concurrent.rs index cee0ba19..6c18753e 100644 --- a/futures-util/src/stream/stream/for_each_concurrent.rs +++ b/futures-util/src/stream/stream/for_each_concurrent.rs @@ -1,7 +1,7 @@ use crate::stream::{FuturesUnordered, StreamExt}; use core::fmt; -use core::pin::Pin; use core::num::NonZeroUsize; +use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; @@ -35,9 +35,10 @@ where } impl ForEachConcurrent -where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, { pub(super) fn new(stream: St, limit: Option, f: F) -> Self { Self { @@ -51,9 +52,10 @@ where St: Stream, } impl FusedFuture for ForEachConcurrent - where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, { fn is_terminated(&self) -> bool { self.stream.is_none() && self.futures.is_empty() @@ -61,9 +63,10 @@ impl FusedFuture for ForEachConcurrent } impl Future for ForEachConcurrent - where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, { type Output = (); @@ -80,7 +83,7 @@ impl Future for ForEachConcurrent Poll::Ready(Some(elem)) => { made_progress_this_iter = true; Some(elem) - }, + } Poll::Ready(None) => { stream_completed = true; None @@ -102,9 +105,9 @@ impl Future for ForEachConcurrent Poll::Ready(Some(())) => made_progress_this_iter = true, Poll::Ready(None) => { if this.stream.is_none() { - return Poll::Ready(()) + return Poll::Ready(()); } - }, + } Poll::Pending => {} } diff --git a/futures-util/src/stream/stream/split.rs b/futures-util/src/stream/stream/split.rs index 997b9747..3a72fee3 100644 --- a/futures-util/src/stream/stream/split.rs +++ b/futures-util/src/stream/stream/split.rs @@ -1,9 +1,9 @@ +use core::fmt; +use core::pin::Pin; use futures_core::ready; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use core::fmt; -use core::pin::Pin; use crate::lock::BiLock; @@ -20,7 +20,8 @@ impl SplitStream { /// together. Succeeds only if the `SplitStream` and `SplitSink` are /// a matching pair originating from the same call to `StreamExt::split`. pub fn reunite(self, other: SplitSink) -> Result> - where S: Sink, + where + S: Sink, { other.reunite(self) } @@ -36,10 +37,7 @@ impl Stream for SplitStream { #[allow(bad_style)] fn SplitSink, Item>(lock: BiLock) -> SplitSink { - SplitSink { - lock, - slot: None, - } + SplitSink { lock, slot: None } } /// A `Sink` part of the split pair @@ -58,14 +56,16 @@ impl + Unpin, Item> SplitSink { /// together. Succeeds only if the `SplitStream` and `SplitSink` are /// a matching pair originating from the same call to `StreamExt::split`. pub fn reunite(self, other: SplitStream) -> Result> { - self.lock.reunite(other.0).map_err(|err| { - ReuniteError(SplitSink(err.0), SplitStream(err.1)) - }) + self.lock.reunite(other.0).map_err(|err| ReuniteError(SplitSink(err.0), SplitStream(err.1))) } } impl, Item> SplitSink { - fn poll_flush_slot(mut inner: Pin<&mut S>, slot: &mut Option, cx: &mut Context<'_>) -> Poll> { + fn poll_flush_slot( + mut inner: Pin<&mut S>, + slot: &mut Option, + cx: &mut Context<'_>, + ) -> Poll> { if slot.is_some() { ready!(inner.as_mut().poll_ready(cx))?; Poll::Ready(inner.start_send(slot.take().unwrap())) @@ -74,7 +74,10 @@ impl, Item> SplitSink { } } - fn poll_lock_and_flush_slot(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_lock_and_flush_slot( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { let this = &mut *self; let mut inner = ready!(this.lock.poll_lock(cx)); Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx) @@ -127,9 +130,7 @@ pub struct ReuniteError(pub SplitSink, pub SplitStream); impl fmt::Debug for ReuniteError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("ReuniteError") - .field(&"...") - .finish() + f.debug_tuple("ReuniteError").field(&"...").finish() } } diff --git a/futures-util/src/stream/try_stream/try_buffer_unordered.rs b/futures-util/src/stream/try_stream/try_buffer_unordered.rs index 71c6fc7e..9a899d4e 100644 --- a/futures-util/src/stream/try_stream/try_buffer_unordered.rs +++ b/futures-util/src/stream/try_stream/try_buffer_unordered.rs @@ -1,12 +1,12 @@ -use crate::stream::{Fuse, FuturesUnordered, StreamExt, IntoStream}; use crate::future::{IntoFuture, TryFutureExt}; +use crate::stream::{Fuse, FuturesUnordered, IntoStream, StreamExt}; +use core::pin::Pin; use futures_core::future::TryFuture; use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; -use core::pin::Pin; pin_project! { /// Stream for the @@ -24,8 +24,9 @@ pin_project! { } impl TryBufferUnordered - where St: TryStream, - St::Ok: TryFuture, +where + St: TryStream, + St::Ok: TryFuture, { pub(super) fn new(stream: St, n: usize) -> Self { Self { @@ -39,15 +40,13 @@ impl TryBufferUnordered } impl Stream for TryBufferUnordered - where St: TryStream, - St::Ok: TryFuture, +where + St: TryStream, + St::Ok: TryFuture, { type Item = Result<::Ok, St::Error>; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); // First up, try to spawn off as many futures as possible by filling up @@ -77,8 +76,9 @@ impl Stream for TryBufferUnordered // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl Sink for TryBufferUnordered - where S: TryStream + Sink, - S::Ok: TryFuture, +where + S: TryStream + Sink, + S::Ok: TryFuture, { type Error = E; diff --git a/futures-util/src/stream/try_stream/try_buffered.rs b/futures-util/src/stream/try_stream/try_buffered.rs index ff7e8447..45bd3f8c 100644 --- a/futures-util/src/stream/try_stream/try_buffered.rs +++ b/futures-util/src/stream/try_stream/try_buffered.rs @@ -1,12 +1,12 @@ -use crate::stream::{Fuse, FuturesOrdered, StreamExt, IntoStream}; use crate::future::{IntoFuture, TryFutureExt}; +use crate::stream::{Fuse, FuturesOrdered, IntoStream, StreamExt}; +use core::pin::Pin; use futures_core::future::TryFuture; use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; -use core::pin::Pin; pin_project! { /// Stream for the [`try_buffered`](super::TryStreamExt::try_buffered) method. @@ -47,10 +47,7 @@ where { type Item = Result<::Ok, St::Error>; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); // First up, try to spawn off as many futures as possible by filling up diff --git a/futures-util/src/stream/try_stream/try_for_each_concurrent.rs b/futures-util/src/stream/try_stream/try_for_each_concurrent.rs index d2f4b0fe..62734c74 100644 --- a/futures-util/src/stream/try_stream/try_for_each_concurrent.rs +++ b/futures-util/src/stream/try_stream/try_for_each_concurrent.rs @@ -1,8 +1,8 @@ use crate::stream::{FuturesUnordered, StreamExt}; use core::fmt; use core::mem; -use core::pin::Pin; use core::num::NonZeroUsize; +use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; @@ -37,9 +37,10 @@ where } impl FusedFuture for TryForEachConcurrent - where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: Future>, +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future>, { fn is_terminated(&self) -> bool { self.stream.is_none() && self.futures.is_empty() @@ -47,9 +48,10 @@ impl FusedFuture for TryForEachConcurrent } impl TryForEachConcurrent -where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: Future>, +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future>, { pub(super) fn new(stream: St, limit: Option, f: F) -> Self { Self { @@ -63,9 +65,10 @@ where St: TryStream, } impl Future for TryForEachConcurrent - where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: Future>, +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future>, { type Output = Result<(), St::Error>; @@ -85,7 +88,7 @@ impl Future for TryForEachConcurrent Poll::Ready(Some(Ok(elem))) => { made_progress_this_iter = true; Some(elem) - }, + } Poll::Ready(None) => { this.stream.set(None); None @@ -109,9 +112,9 @@ impl Future for TryForEachConcurrent Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true, Poll::Ready(None) => { if this.stream.is_none() { - return Poll::Ready(Ok(())) + return Poll::Ready(Ok(())); } - }, + } Poll::Pending => {} Poll::Ready(Some(Err(e))) => { // Empty the stream and futures so that we know diff --git a/futures/tests_disabled/all.rs b/futures/tests_disabled/all.rs index 6c7e11cf..a7a57104 100644 --- a/futures/tests_disabled/all.rs +++ b/futures/tests_disabled/all.rs @@ -1,27 +1,27 @@ -use futures::future; -use futures::executor::block_on; use futures::channel::oneshot::{self, Canceled}; +use futures::executor::block_on; +use futures::future; use std::sync::mpsc::{channel, TryRecvError}; -mod support; -use support::*; +// mod support; +// use support::*; fn unselect(r: Result, Either<(E, B), (E, A)>>) -> Result { match r { - Ok(Either::Left((t, _))) | - Ok(Either::Right((t, _))) => Ok(t), - Err(Either::Left((e, _))) | - Err(Either::Right((e, _))) => Err(e), + Ok(Either::Left((t, _))) | Ok(Either::Right((t, _))) => Ok(t), + Err(Either::Left((e, _))) | Err(Either::Right((e, _))) => Err(e), } } #[test] fn result_smoke() { fn is_future_v(_: C) - where A: Send + 'static, - B: Send + 'static, - C: Future - {} + where + A: Send + 'static, + B: Send + 'static, + C: Future, + { + } is_future_v::(f_ok(1).map(|a| a + 1)); is_future_v::(f_ok(1).map_err(|a| a + 1)); @@ -64,7 +64,9 @@ fn result_smoke() { #[test] fn test_empty() { - fn empty() -> Empty { future::empty() } + fn empty() -> Empty { + future::empty() + } assert_empty(|| empty()); assert_empty(|| empty().select(empty())); @@ -105,16 +107,22 @@ fn flatten() { #[test] fn smoke_oneshot() { - assert_done(|| { - let (c, p) = oneshot::channel(); - c.send(1).unwrap(); - p - }, Ok(1)); - assert_done(|| { - let (c, p) = oneshot::channel::(); - drop(c); - p - }, Err(Canceled)); + assert_done( + || { + let (c, p) = oneshot::channel(); + c.send(1).unwrap(); + p + }, + Ok(1), + ); + assert_done( + || { + let (c, p) = oneshot::channel::(); + drop(c); + p + }, + Err(Canceled), + ); let mut completes = Vec::new(); assert_empty(|| { let (a, b) = oneshot::channel::(); @@ -129,9 +137,7 @@ fn smoke_oneshot() { let (c, p) = oneshot::channel::(); drop(c); let (tx, rx) = channel(); - p.then(move |_| { - tx.send(()) - }).forget(); + p.then(move |_| tx.send(())).forget(); rx.recv().unwrap(); } @@ -139,8 +145,14 @@ fn smoke_oneshot() { fn select_cancels() { let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); let mut f = b.select(d).then(unselect); // assert!(f.poll(&mut Task::new()).is_pending()); @@ -156,8 +168,14 @@ fn select_cancels() { let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); let ((btx, _brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); let mut f = b.select(d).then(unselect); assert!(f.poll(lw).ok().unwrap().is_pending()); @@ -173,8 +191,14 @@ fn select_cancels() { fn join_cancels() { let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); let ((btx, _brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); let mut f = b.join(d); drop(a); @@ -185,8 +209,14 @@ fn join_cancels() { let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); let ((btx, _brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); let (tx, rx) = channel(); let f = b.join(d); @@ -194,7 +224,8 @@ fn join_cancels() { tx.send(()).unwrap(); let res: Result<(), ()> = Ok(()); res - }).forget(); + }) + .forget(); assert!(rx.try_recv().is_err()); drop(a); rx.recv().unwrap(); @@ -243,7 +274,6 @@ fn join_incomplete() { }) } - #[test] fn select2() { assert_done(|| f_ok(2).select(empty()).then(unselect), Ok(2)); @@ -251,14 +281,15 @@ fn select2() { assert_done(|| f_err(2).select(empty()).then(unselect), Err(2)); assert_done(|| empty().select(f_err(2)).then(unselect), Err(2)); - assert_done(|| { - f_ok(1).select(f_ok(2)) - .map_err(|_| 0) - .and_then(|either_tup| { - let (a, b) = either_tup.into_inner(); - b.map(move |b| a + b) - }) - }, Ok(3)); + assert_done( + || { + f_ok(1).select(f_ok(2)).map_err(|_| 0).and_then(|either_tup| { + let (a, b) = either_tup.into_inner(); + b.map(move |b| a + b) + }) + }, + Ok(3), + ); // Finish one half of a select and then fail the second, ensuring that we // get the notification of the second one. @@ -297,8 +328,14 @@ fn select2() { { let ((_a, b), (_c, d)) = (oneshot::channel::(), oneshot::channel::()); let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |v| { btx.send(v).unwrap(); v }); - let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let b = b.map(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map(move |v| { + dtx.send(v).unwrap(); + v + }); let f = b.select(d); drop(f); assert!(drx.recv().is_err()); @@ -309,8 +346,14 @@ fn select2() { { let ((_a, b), (_c, d)) = (oneshot::channel::(), oneshot::channel::()); let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |v| { btx.send(v).unwrap(); v }); - let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let b = b.map(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map(move |v| { + dtx.send(v).unwrap(); + v + }); let mut f = b.select(d); let _res = noop_waker_lw(|lw| f.poll(lw)); drop(f); @@ -322,8 +365,14 @@ fn select2() { { let ((a, b), (_c, d)) = (oneshot::channel::(), oneshot::channel::()); let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |v| { btx.send(v).unwrap(); v }); - let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let b = b.map(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map(move |v| { + dtx.send(v).unwrap(); + v + }); let (tx, rx) = channel(); b.select(d).map(move |_| tx.send(()).unwrap()).forget(); drop(a); diff --git a/futures/tests_disabled/bilock.rs b/futures/tests_disabled/bilock.rs index c1bc33f5..0166ca48 100644 --- a/futures/tests_disabled/bilock.rs +++ b/futures/tests_disabled/bilock.rs @@ -1,11 +1,11 @@ -use futures::task; -use futures::stream; use futures::future; +use futures::stream; +use futures::task; use futures_util::lock::BiLock; use std::thread; -mod support; -use support::*; +// mod support; +// use support::*; #[test] fn smoke() { @@ -41,9 +41,9 @@ fn smoke() { }); assert!(task::spawn(future) - .poll_future_notify(¬ify_noop(), 0) - .expect("failure in poll") - .is_ready()); + .poll_future_notify(¬ify_noop(), 0) + .expect("failure in poll") + .is_ready()); } #[test] @@ -51,10 +51,7 @@ fn concurrent() { const N: usize = 10000; let (a, b) = BiLock::new(0); - let a = Increment { - a: Some(a), - remaining: N, - }; + let a = Increment { a: Some(a), remaining: N }; let b = stream::iter_ok(0..N).fold(b, |b, _n| { b.lock().map(|mut b| { *b += 1; @@ -89,7 +86,7 @@ fn concurrent() { fn poll(&mut self) -> Poll, ()> { loop { if self.remaining == 0 { - return Ok(self.a.take().unwrap().into()) + return Ok(self.a.take().unwrap().into()); } let a = self.a.as_ref().unwrap(); diff --git a/futures/tests_disabled/stream.rs b/futures/tests_disabled/stream.rs index ef0676db..854dbad8 100644 --- a/futures/tests_disabled/stream.rs +++ b/futures/tests_disabled/stream.rs @@ -1,26 +1,26 @@ +use futures::channel::mpsc; +use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; use futures::future::{err, ok}; use futures::stream::{empty, iter_ok, poll_fn, Peekable}; -use futures::channel::oneshot; -use futures::channel::mpsc; -mod support; -use support::*; +// mod support; +// use support::*; pub struct Iter { iter: I, } pub fn iter(i: J) -> Iter - where J: IntoIterator>, +where + J: IntoIterator>, { - Iter { - iter: i.into_iter(), - } + Iter { iter: i.into_iter() } } impl Stream for Iter - where I: Iterator>, +where + I: Iterator>, { type Item = T; type Error = E; @@ -34,21 +34,15 @@ impl Stream for Iter } } -fn list() -> Box + Send> { +fn list() -> Box + Send> { let (tx, rx) = mpsc::channel(1); - tx.send(Ok(1)) - .and_then(|tx| tx.send(Ok(2))) - .and_then(|tx| tx.send(Ok(3))) - .forget(); + tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Ok(3))).forget(); Box::new(rx.then(|r| r.unwrap())) } -fn err_list() -> Box + Send> { +fn err_list() -> Box + Send> { let (tx, rx) = mpsc::channel(1); - tx.send(Ok(1)) - .and_then(|tx| tx.send(Ok(2))) - .and_then(|tx| tx.send(Err(3))) - .forget(); + tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Err(3))).forget(); Box::new(rx.then(|r| r.unwrap())) } @@ -89,40 +83,31 @@ fn filter() { #[test] fn filter_map() { - assert_done(|| list().filter_map(|x| { - ok(if x % 2 == 0 { - Some(x + 10) - } else { - None - }) - }).collect(), Ok(vec![12])); + assert_done( + || list().filter_map(|x| ok(if x % 2 == 0 { Some(x + 10) } else { None })).collect(), + Ok(vec![12]), + ); } #[test] fn and_then() { assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4])); - assert_done(|| list().and_then(|a| err::(a as u32)).collect::>(), - Err(1)); + assert_done(|| list().and_then(|a| err::(a as u32)).collect::>(), Err(1)); } #[test] fn then() { assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4])); - } #[test] fn or_else() { - assert_done(|| err_list().or_else(|a| { - ok::(a as i32) - }).collect(), Ok(vec![1, 2, 3])); + assert_done(|| err_list().or_else(|a| ok::(a as i32)).collect(), Ok(vec![1, 2, 3])); } #[test] fn flatten() { - assert_done(|| list().map(|_| list()).flatten().collect(), - Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3])); - + assert_done(|| list().map(|_| list()).flatten().collect(), Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3])); } #[test] @@ -132,9 +117,7 @@ fn skip() { #[test] fn skip_passes_errors_through() { - let mut s = block_on_stream( - iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1) - ); + let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1)); assert_eq!(s.next(), Some(Err(1))); assert_eq!(s.next(), Some(Err(2))); assert_eq!(s.next(), Some(Ok(4))); @@ -144,8 +127,7 @@ fn skip_passes_errors_through() { #[test] fn skip_while() { - assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), - Ok(vec![2, 3])); + assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), Ok(vec![2, 3])); } #[test] fn take() { @@ -154,8 +136,7 @@ fn take() { #[test] fn take_while() { - assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), - Ok(vec![1, 2])); + assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), Ok(vec![1, 2])); } #[test] @@ -193,9 +174,9 @@ fn buffered() { let (a, b) = oneshot::channel::(); let (c, d) = oneshot::channel::(); - tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) - .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) - .forget(); + tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) + .forget(); let mut rx = rx.buffered(2); sassert_empty(&mut rx); @@ -211,9 +192,9 @@ fn buffered() { let (a, b) = oneshot::channel::(); let (c, d) = oneshot::channel::(); - tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) - .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) - .forget(); + tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) + .forget(); let mut rx = rx.buffered(1); sassert_empty(&mut rx); @@ -233,8 +214,8 @@ fn unordered() { let (c, d) = oneshot::channel::(); tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) - .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) - .forget(); + .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) + .forget(); let mut rx = rx.buffer_unordered(2); sassert_empty(&mut rx); @@ -250,8 +231,8 @@ fn unordered() { let (c, d) = oneshot::channel::(); tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) - .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) - .forget(); + .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) + .forget(); // We don't even get to see `c` until `a` completes. let mut rx = rx.buffer_unordered(1); @@ -267,21 +248,17 @@ fn unordered() { #[test] fn zip() { - assert_done(|| list().zip(list()).collect(), - Ok(vec![(1, 1), (2, 2), (3, 3)])); - assert_done(|| list().zip(list().take(2)).collect(), - Ok(vec![(1, 1), (2, 2)])); - assert_done(|| list().take(2).zip(list()).collect(), - Ok(vec![(1, 1), (2, 2)])); + assert_done(|| list().zip(list()).collect(), Ok(vec![(1, 1), (2, 2), (3, 3)])); + assert_done(|| list().zip(list().take(2)).collect(), Ok(vec![(1, 1), (2, 2)])); + assert_done(|| list().take(2).zip(list()).collect(), Ok(vec![(1, 1), (2, 2)])); assert_done(|| err_list().zip(list()).collect::>(), Err(3)); - assert_done(|| list().zip(list().map(|x| x + 1)).collect(), - Ok(vec![(1, 2), (2, 3), (3, 4)])); + assert_done(|| list().zip(list().map(|x| x + 1)).collect(), Ok(vec![(1, 2), (2, 3), (3, 4)])); } #[test] fn peek() { struct Peek { - inner: Peekable + Send>> + inner: Peekable + Send>>, } impl Future for Peek { @@ -299,15 +276,12 @@ fn peek() { } } - block_on(Peek { - inner: list().peekable(), - }).unwrap() + block_on(Peek { inner: list().peekable() }).unwrap() } #[test] fn wait() { - assert_eq!(block_on_stream(list()).collect::, _>>(), - Ok(vec![1, 2, 3])); + assert_eq!(block_on_stream(list()).collect::, _>>(), Ok(vec![1, 2, 3])); } #[test] @@ -337,8 +311,10 @@ fn forward() { let v = block_on(iter_ok::<_, Never>(vec![2, 3]).forward(v)).unwrap().1; assert_eq!(v, vec![0, 1, 2, 3]); - assert_done(move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s), - Ok(vec![0, 1, 2, 3, 4, 5])); + assert_done( + move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s), + Ok(vec![0, 1, 2, 3, 4, 5]), + ); } #[test] diff --git a/tools/fmt.sh b/tools/fmt.sh new file mode 100755 index 00000000..5438030a --- /dev/null +++ b/tools/fmt.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +# Format all rust code. +# +# Usage: +# ./tools/fmt.sh +# +# This script is needed because `cargo fmt` cannot recognize modules defined inside macros. +# Refs: https://github.com/rust-lang/rustfmt/issues/4078 + +set -euo pipefail +IFS=$'\n\t' + +cd "$(cd "$(dirname "${0}")" && pwd)"/.. + +# shellcheck disable=SC2046 +if [[ -z "${CI:-}" ]]; then + ( + # `cargo fmt` cannot recognize modules defined inside macros so run rustfmt directly. + rustfmt $(git ls-files "*.rs") + ) +else + ( + rustfmt --check $(git ls-files "*.rs") + ) +fi