From 5cc3a5dd403424e9963c77925ed60def755d037c Mon Sep 17 00:00:00 2001 From: Amanieu d'Antras Date: Wed, 11 May 2016 19:26:54 +0100 Subject: [PATCH] Initial commit --- .gitignore | 2 + .travis.yml | 25 +++ Cargo.toml | 13 ++ LICENSE-APACHE | 201 +++++++++++++++++++ LICENSE-MIT | 25 +++ README.md | 130 ++++++++++++ src/condvar.rs | 334 ++++++++++++++++++++++++++++++ src/lib.rs | 120 +++++++++++ src/mutex.rs | 354 ++++++++++++++++++++++++++++++++ src/once.rs | 383 +++++++++++++++++++++++++++++++++++ src/parking_lot.rs | 469 +++++++++++++++++++++++++++++++++++++++++++ src/raw_mutex.rs | 146 ++++++++++++++ src/raw_rwlock.rs | 333 ++++++++++++++++++++++++++++++ src/rwlock.rs | 463 ++++++++++++++++++++++++++++++++++++++++++ src/thread_parker.rs | 74 +++++++ src/word_lock.rs | 187 +++++++++++++++++ 16 files changed, 3259 insertions(+) create mode 100644 .gitignore create mode 100644 .travis.yml create mode 100644 Cargo.toml create mode 100644 LICENSE-APACHE create mode 100644 LICENSE-MIT create mode 100644 README.md create mode 100644 src/condvar.rs create mode 100644 src/lib.rs create mode 100644 src/mutex.rs create mode 100644 src/once.rs create mode 100644 src/parking_lot.rs create mode 100644 src/raw_mutex.rs create mode 100644 src/raw_rwlock.rs create mode 100644 src/rwlock.rs create mode 100644 src/thread_parker.rs create mode 100644 src/word_lock.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a9d37c5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +target +Cargo.lock diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..c6fa660 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,25 @@ +language: rust +sudo: false + +rust: +- nightly + +before_script: +- | + pip install 'travis-cargo<0.2' --user && + export PATH=$HOME/.local/bin:$PATH + +script: +- travis-cargo build +- travis-cargo test +- cargo doc --no-deps + +after_success: +- travis-cargo --only nightly doc-upload + +env: + global: + - TRAVIS_CARGO_NIGHTLY_FEATURE="" + +notifications: + email: false diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..83d87ec --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "parking_lot" +version = "0.1.0" +authors = ["Amanieu d'Antras "] +description = "Compact and efficient synchronization primitives. Also provides an API for creating custom synchronization primitives." +documentation = "https://amanieu.github.io/parking_lot/parking_lot/index.html" +license = "Apache-2.0/MIT" +repository = "https://github.com/Amanieu/parking_lot" +readme = "README.md" +keywords = ["mutex", "condvar", "rwlock", "once", "thread"] + +[dev-dependencies] +rand = "0.3" diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..16fe87b --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..40b8817 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2016 The Rust Project Developers + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..6e48693 --- /dev/null +++ b/README.md @@ -0,0 +1,130 @@ +parking_lot +============ + +[![Build Status](https://travis-ci.org/Amanieu/parking_lot.svg?branch=master)](https://travis-ci.org/Amanieu/parking_lot) [![Crates.io](https://img.shields.io/crates/v/parking_lot.svg)](https://crates.io/crates/parking_lot) + +[Documentation](https://amanieu.github.io/parking_lot/parking_lot/index.html) + +This library provides implementations of `Mutex`, `RwLock`, `Condvar` and +`Once` that are smaller, faster and more flexible than those in the Rust +standard library. It also exposes a low-level API for creating your own +efficient synchronization primitives. + +## Features + +The primitives provided by this library have several advantages over those +in the Rust standard library: + +1. `Mutex`, `Condvar` and `Once` only require 1 byte of storage space, and + `RwLock` only requires 1 word of storage space. On the other hand the + standard library primitives require a dynamically allocated `Box` to hold + OS-specific synchronization primitives. The small size of `Mutex` in + particular encourages the use of fine-grained locks to increase + parallelism. +2. Since they consist of just a single atomic variable, have constant + initializers and don't need destructors, these primitives can be used as + `static` global variables. The standard library primitives require + dynamic initialization and thus need to be lazily initialized with + `lazy_static!`. +3. Uncontended lock acquisition and release is done through fast inline + paths which only require a single atomic operation. +4. Microcontention (a contended lock with a short critical section) is + efficiently handled by spinning a few times while trying to acquire a + lock. +5. The locks are adaptive and will suspend a thread after a few failed spin + attempts. This makes the locks suitable for both long and short critical + sections. + +## The parking lot + +To keep these primitives small, all thread queuing and suspending +functionality is offloaded to the *parking lot*. The idea behind this is +based on the Webkit [`WTF::ParkingLot`] +(https://webkit.org/blog/6161/locking-in-webkit/) class, which essentially +consists of a hash table mapping of lock addresses to queues of parked +(sleeping) threads. The Webkit parking lot was itself inspired by Linux +[futexes](http://man7.org/linux/man-pages/man2/futex.2.html), but it is more +powerful since it allows invoking callbacks while holding a queue lock. + +*Parking* refers to suspending the thread while simultaneously enqueuing it +on a queue keyed by some address. *Unparking* refers to dequeuing a thread +from a queue keyed by some address and resuming it. The parking lot API +consists of just 3 functions: + +```rust,ignore +unsafe fn park(key: usize, + validate: &mut FnMut() -> bool, + before_sleep: &mut FnMut(), + timeout: Option) + -> bool +``` + +This function performs the following steps: + +1. Lock the queue associated with `key`. +2. Call `validate`, if it returns `false`, unlock the queue and return. +3. Add the current thread to the queue. +4. Unlock the queue. +5. Call `before_sleep`. +6. Sleep until we are unparked or `timeout` is reached. +7. Return `true` if we were unparked by another thread, `false` otherwise. + +```rust,ignore +unsafe fn unpark_one(key: usize, + callback: &mut FnMut(UnparkResult)) + -> UnparkResult +``` + +This function will unpark a single thread from the queue associated with +`key`. The `callback` function is invoked while holding the queue lock but +before the thread is unparked. The `UnparkResult` indicates whether the +queue was empty and, if not, whether there are still threads remaining in +the queue. + +```rust,ignore +unsafe fn unpark_all(key: usize) -> usize +``` + +This function will unpark all threads in the queue associated with `key`. It +returns the number of threads that were unparked. + +## Building custom synchronization primitives + +Building custom synchronization primitives is very simple since +`parking_lot` takes care of all the hard parts for you. The most commmon +case for a custom primitive would be to integrate a `Mutex` inside another +data type. Since a mutex only requires 2 bits, it can share space with other +data. For example, one could create an `ArcMutex` type that combines the +atomic reference count and the two mutex bits in the same atomic word. + +## Usage + +This crate currently requires a nightly Rust compiler. + +Add this to your `Cargo.toml`: + +```toml +[dependencies] +parking_lot = "0.1" +``` + +and this to your crate root: + +```rust +extern crate parking_lot; +``` + +## License + +Licensed under either of + + * Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any +additional terms or conditions. diff --git a/src/condvar.rs b/src/condvar.rs new file mode 100644 index 0000000..563a027 --- /dev/null +++ b/src/condvar.rs @@ -0,0 +1,334 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use std::sync::atomic::{AtomicU8, Ordering}; +use std::time::{Duration, Instant}; +use parking_lot::{self, UnparkResult}; +use mutex::{MutexGuard, guard_lock}; + +/// A type indicating whether a timed wait on a condition variable returned +/// due to a time out or not. +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub struct WaitTimeoutResult(bool); + +impl WaitTimeoutResult { + /// Returns whether the wait was known to have timed out. + pub fn timed_out(&self) -> bool { + self.0 + } +} + +/// A Condition Variable +/// +/// Condition variables represent the ability to block a thread such that it +/// consumes no CPU time while waiting for an event to occur. Condition +/// variables are typically associated with a boolean predicate (a condition) +/// and a mutex. The predicate is always verified inside of the mutex before +/// determining that thread must block. +/// +/// # Differences from the standard library `Condvar` +/// +/// - No spurious wakeups: A wait will only return a non-timeout result if it +/// was woken up by `notify_one` or `notify_all`. +/// - Only requires 1 byte of space, whereas the standard library boxes the +/// `Condvar` due to platform limitations. +/// - Can be statically constructed (requires the `const_fn` nightly feature). +/// - Does not require any drop glue when dropped. +/// - Inline fast path for the uncontended case. +/// +/// # Examples +/// +/// ``` +/// use parking_lot::{Mutex, Condvar}; +/// use std::sync::Arc; +/// use std::thread; +/// +/// let pair = Arc::new((Mutex::new(false), Condvar::new())); +/// let pair2 = pair.clone(); +/// +/// // Inside of our lock, spawn a new thread, and then wait for it to start +/// thread::spawn(move|| { +/// let &(ref lock, ref cvar) = &*pair2; +/// let mut started = lock.lock(); +/// *started = true; +/// cvar.notify_one(); +/// }); +/// +/// // wait for the thread to start up +/// let &(ref lock, ref cvar) = &*pair; +/// let mut started = lock.lock(); +/// while !*started { +/// cvar.wait(&mut started); +/// } +/// ``` +pub struct Condvar { + state: AtomicU8, +} + +impl Condvar { + /// Creates a new condition variable which is ready to be waited on and + /// notified. + #[inline] + pub const fn new() -> Condvar { + Condvar { state: AtomicU8::new(0) } + } + + /// Wakes up one blocked thread on this condvar. + /// + /// If there is a blocked thread on this condition variable, then it will + /// be woken up from its call to `wait` or `wait_timeout`. Calls to + /// `notify_one` are not buffered in any way. + /// + /// To wake up all threads, see `notify_all()`. + #[inline] + pub fn notify_one(&self) { + // Nothing to do if there are no waiting threads + if self.state.load(Ordering::Relaxed) == 0 { + return; + } + + unsafe { + // Unpark one thread + let addr = self as *const _ as usize; + let callback = &mut |result| { + // Clear our state if there are no more waiting threads + if result != UnparkResult::UnparkedNotLast { + self.state.store(0, Ordering::Relaxed); + } + }; + parking_lot::unpark_one(addr, callback); + } + } + + /// Wakes up all blocked threads on this condvar. + /// + /// This method will ensure that any current waiters on the condition + /// variable are awoken. Calls to `notify_all()` are not buffered in any + /// way. + /// + /// To wake up only one thread, see `notify_one()`. + #[inline] + pub fn notify_all(&self) { + // Nothing to do if there are no waiting threads + if self.state.load(Ordering::Relaxed) == 0 { + return; + } + + // Clear our state since we are going to wake all threads up anyways + self.state.store(0, Ordering::Relaxed); + + unsafe { + // Unpark all threads + let addr = self as *const _ as usize; + parking_lot::unpark_all(addr); + } + } + + /// Blocks the current thread until this condition variable receives a + /// notification. + /// + /// This function will atomically unlock the mutex specified (represented by + /// `mutex_guard`) and block the current thread. This means that any calls + /// to `notify_*()` which happen logically after the mutex is unlocked are + /// candidates to wake this thread up. When this function call returns, the + /// lock specified will have been re-acquired. + #[inline] + pub fn wait(&self, guard: &mut MutexGuard) { + unsafe { + let addr = self as *const _ as usize; + let validate = &mut || { + // This is done while locked to avoid races with notify_one + self.state.store(1, Ordering::Relaxed); + true + }; + let before_sleep = &mut || { + // Unlock the mutex before sleeping... + guard_lock(guard).unlock(); + }; + parking_lot::park(addr, validate, before_sleep, None); + + // ... and re-lock it once we are done sleeping + guard_lock(guard).lock(); + } + } + + /// Waits on this condition variable for a notification, timing out after + /// the specified time instant. + /// + /// The semantics of this function are equivalent to `wait()` except that + /// the thread will be blocked roughly until `timeout` is reached. This + /// method should not be used for precise timing due to anomalies such as + /// preemption or platform differences that may not cause the maximum + /// amount of time waited to be precisely `timeout`. + /// + /// The returned `WaitTimeoutResult` value indicates if the timeout is + /// known to have elapsed. + /// + /// Like `wait`, the lock specified will be re-acquired when this function + /// returns, regardless of whether the timeout elapsed or not. + #[inline] + pub fn wait_until(&self, + guard: &mut MutexGuard, + timeout: Instant) + -> WaitTimeoutResult { + unsafe { + let result; + if timeout <= Instant::now() { + // If the timeout is in the past, we still need to release and + // re-acquire the mutex. + guard_lock(guard).unlock(); + result = false; + } else { + let addr = self as *const _ as usize; + let validate = &mut || { + // This is done while locked to avoid races with notify_one + self.state.store(1, Ordering::Relaxed); + true + }; + let before_sleep = &mut || { + // Unlock the mutex before sleeping... + guard_lock(guard).unlock(); + }; + result = parking_lot::park(addr, validate, before_sleep, Some(timeout)); + } + + // ... and re-lock it once we are done sleeping + guard_lock(guard).lock(); + + WaitTimeoutResult(!result) + } + } + + /// Waits on this condition variable for a notification, timing out after a + /// specified duration. + /// + /// The semantics of this function are equivalent to `wait()` except that + /// the thread will be blocked for roughly no longer than `timeout`. This + /// method should not be used for precise timing due to anomalies such as + /// preemption or platform differences that may not cause the maximum + /// amount of time waited to be precisely `timeout`. + /// + /// The returned `WaitTimeoutResult` value indicates if the timeout is + /// known to have elapsed. + /// + /// Like `wait`, the lock specified will be re-acquired when this function + /// returns, regardless of whether the timeout elapsed or not. + #[inline] + pub fn wait_for(&self, + guard: &mut MutexGuard, + timeout: Duration) + -> WaitTimeoutResult { + self.wait_until(guard, Instant::now() + timeout) + } +} + +impl Default for Condvar { + fn default() -> Condvar { + Condvar::new() + } +} + +#[cfg(test)] +mod tests { + use std::sync::mpsc::channel; + use std::sync::Arc; + use std::thread; + use std::time::{Duration, Instant}; + use {Condvar, Mutex}; + + #[test] + fn smoke() { + let c = Condvar::new(); + c.notify_one(); + c.notify_all(); + } + + #[test] + fn notify_one() { + static C: Condvar = Condvar::new(); + static M: Mutex<()> = Mutex::new(()); + + let mut g = M.lock(); + let _t = thread::spawn(move || { + let _g = M.lock(); + C.notify_one(); + }); + C.wait(&mut g); + } + + #[test] + fn notify_all() { + const N: usize = 10; + + let data = Arc::new((Mutex::new(0), Condvar::new())); + let (tx, rx) = channel(); + for _ in 0..N { + let data = data.clone(); + let tx = tx.clone(); + thread::spawn(move || { + let &(ref lock, ref cond) = &*data; + let mut cnt = lock.lock(); + *cnt += 1; + if *cnt == N { + tx.send(()).unwrap(); + } + while *cnt != 0 { + cond.wait(&mut cnt); + } + tx.send(()).unwrap(); + }); + } + drop(tx); + + let &(ref lock, ref cond) = &*data; + rx.recv().unwrap(); + let mut cnt = lock.lock(); + *cnt = 0; + cond.notify_all(); + drop(cnt); + + for _ in 0..N { + rx.recv().unwrap(); + } + } + + #[test] + fn wait_for() { + static C: Condvar = Condvar::new(); + static M: Mutex<()> = Mutex::new(()); + + let mut g = M.lock(); + let no_timeout = C.wait_for(&mut g, Duration::from_millis(1)); + assert!(no_timeout.timed_out()); + let _t = thread::spawn(move || { + let _g = M.lock(); + C.notify_one(); + }); + let timeout_res = C.wait_for(&mut g, Duration::from_millis(u32::max_value() as u64)); + assert!(!timeout_res.timed_out()); + drop(g); + } + + #[test] + fn wait_until() { + static C: Condvar = Condvar::new(); + static M: Mutex<()> = Mutex::new(()); + + let mut g = M.lock(); + let no_timeout = C.wait_until(&mut g, Instant::now() + Duration::from_millis(1)); + assert!(no_timeout.timed_out()); + let _t = thread::spawn(move || { + let _g = M.lock(); + C.notify_one(); + }); + let timeout_res = C.wait_until(&mut g, + Instant::now() + + Duration::from_millis(u32::max_value() as u64)); + assert!(!timeout_res.timed_out()); + drop(g); + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..243490e --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,120 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +//! This library provides implementations of `Mutex`, `RwLock`, `Condvar` and +//! `Once` that are smaller, faster and more flexible than those in the Rust +//! standard library. It also exposes a low-level API for creating your own +//! efficient synchronization primitives. +//! +//! # Features +//! +//! The primitives provided by this library have several advantages over those +//! in the Rust standard library: +//! +//! 1. `Mutex`, `Condvar` and `Once` only require 1 byte of storage space, and +//! `RwLock` only requires 1 word of storage space. On the other hand the +//! standard library primitives require a dynamically allocated `Box` to hold +//! OS-specific synchronization primitives. The small size of `Mutex` in +//! particular encourages the use of fine-grained locks to increase +//! parallelism. +//! 2. Since they consist of just a single atomic variable, have constant +//! initializers and don't need destructors, these primitives can be used as +//! `static` global variables. The standard library primitives require +//! dynamic initialization and thus need to be lazily initialized with +//! `lazy_static!`. +//! 3. Uncontended lock acquisition and release is done through fast inline +//! paths which only require a single atomic operation. +//! 4. Microcontention (a contended lock with a short critical section) is +//! efficiently handled by spinning a few times while trying to acquire a +//! lock. +//! 5. The locks are adaptive and will suspend a thread after a few failed spin +//! attempts. This makes the locks suitable for both long and short critical +//! sections. +//! +//! # The parking lot +//! +//! To keep these primitives small, all thread queuing and suspending +//! functionality is offloaded to the *parking lot*. The idea behind this is +//! based on the Webkit [`WTF::ParkingLot`] +//! (https://webkit.org/blog/6161/locking-in-webkit/) class, which essentially +//! consists of a hash table mapping of lock addresses to queues of parked +//! (sleeping) threads. The Webkit parking lot was itself inspired by Linux +//! [futexes](http://man7.org/linux/man-pages/man2/futex.2.html), but it is more +//! powerful since it allows invoking callbacks while holding a queue lock. +//! +//! *Parking* refers to suspending the thread while simultaneously enqueuing it +//! on a queue keyed by some address. *Unparking* refers to dequeuing a thread +//! from a queue keyed by some address and resuming it. The parking lot API +//! consists of just 3 functions: +//! +//! ```rust,ignore +//! unsafe fn park(key: usize, +//! validate: &mut FnMut() -> bool, +//! before_sleep: &mut FnMut(), +//! timeout: Option) +//! -> bool +//! ``` +//! +//! This function performs the following steps: +//! +//! 1. Lock the queue associated with `key`. +//! 2. Call `validate`, if it returns `false`, unlock the queue and return. +//! 3. Add the current thread to the queue. +//! 4. Unlock the queue. +//! 5. Call `before_sleep`. +//! 6. Sleep until we are unparked or `timeout` is reached. +//! 7. Return `true` if we were unparked by another thread, `false` otherwise. +//! +//! ```rust,ignore +//! unsafe fn unpark_one(key: usize, +//! callback: &mut FnMut(UnparkResult)) +//! -> UnparkResult +//! ``` +//! +//! This function will unpark a single thread from the queue associated with +//! `key`. The `callback` function is invoked while holding the queue lock but +//! before the thread is unparked. The `UnparkResult` indicates whether the +//! queue was empty and, if not, whether there are still threads remaining in +//! the queue. +//! +//! ```rust,ignore +//! unsafe fn unpark_all(key: usize) -> usize +//! ``` +//! +//! This function will unpark all threads in the queue associated with `key`. It +//! returns the number of threads that were unparked. +//! +//! # Building custom synchronization primitives +//! +//! Building custom synchronization primitives is very simple since +//! `parking_lot` takes care of all the hard parts for you. The most commmon +//! case for a custom primitive would be to integrate a `Mutex` inside another +//! data type. Since a mutex only requires 2 bits, it can share space with other +//! data. For example, one could create an `ArcMutex` type that combines the +//! atomic reference count and the two mutex bits in the same atomic word. + +#![warn(missing_docs)] +#![feature(extended_compare_and_swap, const_fn, integer_atomics)] + +// Spin limit from JikesRVM & Webkit experiments +const SPIN_LIMIT: usize = 40; + +mod thread_parker; +mod word_lock; +mod parking_lot; +mod raw_mutex; +mod raw_rwlock; +mod condvar; +mod mutex; +mod rwlock; +mod once; + +pub use once::{Once, OnceState}; +pub use parking_lot::{UnparkResult, park, unpark_one, unpark_all}; +pub use mutex::{Mutex, MutexGuard}; +pub use condvar::{Condvar, WaitTimeoutResult}; +pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; diff --git a/src/mutex.rs b/src/mutex.rs new file mode 100644 index 0000000..3dd37f7 --- /dev/null +++ b/src/mutex.rs @@ -0,0 +1,354 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use std::cell::UnsafeCell; +use std::ops::{Deref, DerefMut}; +use std::fmt; +use raw_mutex::RawMutex; + +/// A mutual exclusion primitive useful for protecting shared data +/// +/// This mutex will block threads waiting for the lock to become available. The +/// mutex can also be statically initialized or created via a `new` +/// constructor. Each mutex has a type parameter which represents the data that +/// it is protecting. The data can only be accessed through the RAII guards +/// returned from `lock` and `try_lock`, which guarantees that the data is only +/// ever accessed when the mutex is locked. +/// +/// # Differences from the standard library `Mutex` +/// +/// - No poisoning, the lock is released normally on panic. +/// - Only requires 1 byte of space, whereas the standard library boxes the +/// `Mutex` due to platform limitations. +/// - A `MutexGuard` can be sent to another thread and unlocked there. +/// - Can be statically constructed (requires the `const_fn` nightly feature). +/// - Does not require any drop glue when dropped. +/// - Inline fast path for the uncontended case. +/// - Efficient handling of micro-contention using adaptive spinning. +/// +/// # Examples +/// +/// ``` +/// use std::sync::Arc; +/// use parking_lot::Mutex; +/// use std::thread; +/// use std::sync::mpsc::channel; +/// +/// const N: usize = 10; +/// +/// // Spawn a few threads to increment a shared variable (non-atomically), and +/// // let the main thread know once all increments are done. +/// // +/// // Here we're using an Arc to share memory among threads, and the data inside +/// // the Arc is protected with a mutex. +/// let data = Arc::new(Mutex::new(0)); +/// +/// let (tx, rx) = channel(); +/// for _ in 0..10 { +/// let (data, tx) = (data.clone(), tx.clone()); +/// thread::spawn(move || { +/// // The shared state can only be accessed once the lock is held. +/// // Our non-atomic increment is safe because we're the only thread +/// // which can access the shared state when the lock is held. +/// let mut data = data.lock(); +/// *data += 1; +/// if *data == N { +/// tx.send(()).unwrap(); +/// } +/// // the lock is unlocked here when `data` goes out of scope. +/// }); +/// } +/// +/// rx.recv().unwrap(); +/// ``` +pub struct Mutex { + mutex: RawMutex, + data: UnsafeCell, +} + +unsafe impl Send for Mutex {} +unsafe impl Sync for Mutex {} + +/// An RAII implementation of a "scoped lock" of a mutex. When this structure is +/// dropped (falls out of scope), the lock will be unlocked. +/// +/// The data protected by the mutex can be access through this guard via its +/// `Deref` and `DerefMut` implementations +#[must_use] +pub struct MutexGuard<'a, T: ?Sized + 'a> { + mutex: &'a RawMutex, + data: &'a mut T, +} + +impl Mutex { + /// Creates a new mutex in an unlocked state ready for use. + pub const fn new(val: T) -> Mutex { + Mutex { + data: UnsafeCell::new(val), + mutex: RawMutex::new(), + } + } + + /// Consumes this mutex, returning the underlying data. + pub fn into_inner(self) -> T { + unsafe { self.data.into_inner() } + } +} + +impl Mutex { + /// Acquires a mutex, blocking the current thread until it is able to do so. + /// + /// This function will block the local thread until it is available to acquire + /// the mutex. Upon returning, the thread is the only thread with the mutex + /// held. An RAII guard is returned to allow scoped unlock of the lock. When + /// the guard goes out of scope, the mutex will be unlocked. + /// + /// Attempts to lock a mutex in the thread which already holds the lock will + /// result is a deadlock. + pub fn lock(&self) -> MutexGuard { + self.mutex.lock(); + MutexGuard { + mutex: &self.mutex, + data: unsafe { &mut *self.data.get() }, + } + } + + /// Attempts to acquire this lock. + /// + /// If the lock could not be acquired at this time, then `Err` is returned. + /// Otherwise, an RAII guard is returned. The lock will be unlocked when the + /// guard is dropped. + /// + /// This function does not block. + pub fn try_lock(&self) -> Option> { + if self.mutex.try_lock() { + Some(MutexGuard { + mutex: &self.mutex, + data: unsafe { &mut *self.data.get() }, + }) + } else { + None + } + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the `Mutex` mutably, no actual locking needs to + /// take place---the mutable borrow statically guarantees no locks exist. + pub fn get_mut(&mut self) -> &mut T { + unsafe { &mut *self.data.get() } + } +} + +impl Default for Mutex { + fn default() -> Mutex { + Mutex::new(Default::default()) + } +} + +impl fmt::Debug for Mutex { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.try_lock() { + Some(guard) => write!(f, "Mutex {{ data: {:?} }}", &*guard), + None => write!(f, "Mutex {{ }}"), + } + } +} + +impl<'a, T: ?Sized + 'a> Deref for MutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + self.data + } +} + +impl<'a, T: ?Sized + 'a> DerefMut for MutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + self.data + } +} + +impl<'a, T: ?Sized + 'a> Drop for MutexGuard<'a, T> { + fn drop(&mut self) { + self.mutex.unlock(); + } +} + +// Helper function used by Condvar +pub fn guard_lock<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a RawMutex { + &guard.mutex +} + +#[cfg(test)] +mod tests { + use std::sync::mpsc::channel; + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::thread; + use {Mutex, Condvar}; + + struct Packet(Arc<(Mutex, Condvar)>); + + #[derive(Eq, PartialEq, Debug)] + struct NonCopy(i32); + + unsafe impl Send for Packet {} + unsafe impl Sync for Packet {} + + #[test] + fn smoke() { + let m = Mutex::new(()); + drop(m.lock()); + drop(m.lock()); + } + + #[test] + fn lots_and_lots() { + static M: Mutex<()> = Mutex::new(()); + static mut CNT: u32 = 0; + const J: u32 = 1000; + const K: u32 = 3; + + fn inc() { + for _ in 0..J { + unsafe { + let _g = M.lock(); + CNT += 1; + } + } + } + + let (tx, rx) = channel(); + for _ in 0..K { + let tx2 = tx.clone(); + thread::spawn(move || { + inc(); + tx2.send(()).unwrap(); + }); + let tx2 = tx.clone(); + thread::spawn(move || { + inc(); + tx2.send(()).unwrap(); + }); + } + + drop(tx); + for _ in 0..2 * K { + rx.recv().unwrap(); + } + assert_eq!(unsafe { CNT }, J * K * 2); + } + + #[test] + fn try_lock() { + let m = Mutex::new(()); + *m.try_lock().unwrap() = (); + } + + #[test] + fn test_into_inner() { + let m = Mutex::new(NonCopy(10)); + assert_eq!(m.into_inner(), NonCopy(10)); + } + + #[test] + fn test_into_inner_drop() { + struct Foo(Arc); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = Mutex::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_get_mut() { + let mut m = Mutex::new(NonCopy(10)); + *m.get_mut() = NonCopy(20); + assert_eq!(m.into_inner(), NonCopy(20)); + } + + #[test] + fn test_mutex_arc_condvar() { + let packet = Packet(Arc::new((Mutex::new(false), Condvar::new()))); + let packet2 = Packet(packet.0.clone()); + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + // wait until parent gets in + rx.recv().unwrap(); + let &(ref lock, ref cvar) = &*packet2.0; + let mut lock = lock.lock(); + *lock = true; + cvar.notify_one(); + }); + + let &(ref lock, ref cvar) = &*packet.0; + let mut lock = lock.lock(); + tx.send(()).unwrap(); + assert!(!*lock); + while !*lock { + cvar.wait(&mut lock); + } + } + + #[test] + fn test_mutex_arc_nested() { + // Tests nested mutexes and access + // to underlying data. + let arc = Arc::new(Mutex::new(1)); + let arc2 = Arc::new(Mutex::new(arc)); + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + let lock = arc2.lock(); + let lock2 = lock.lock(); + assert_eq!(*lock2, 1); + tx.send(()).unwrap(); + }); + rx.recv().unwrap(); + } + + #[test] + fn test_mutex_arc_access_in_unwind() { + let arc = Arc::new(Mutex::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || -> () { + struct Unwinder { + i: Arc>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + *self.i.lock() += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.lock(); + assert_eq!(*lock, 2); + } + + #[test] + fn test_mutex_unsized() { + let mutex: &Mutex<[i32]> = &Mutex::new([1, 2, 3]); + { + let b = &mut *mutex.lock(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*mutex.lock(), comp); + } +} diff --git a/src/once.rs b/src/once.rs new file mode 100644 index 0000000..5b796e4 --- /dev/null +++ b/src/once.rs @@ -0,0 +1,383 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use std::sync::atomic::{AtomicU8, Ordering, fence}; +use std::thread; +use std::mem; +use parking_lot; +use SPIN_LIMIT; + +const DONE_BIT: u8 = 1; +const POISON_BIT: u8 = 2; +const LOCKED_BIT: u8 = 4; +const PARKED_BIT: u8 = 8; + +/// State yielded to the `call_once_force` method which can be used to query +/// whether the `Once` was previously poisoned or not. +pub struct OnceState(bool); + +impl OnceState { + /// Returns whether the associated `Once` has been poisoned. + /// + /// Once an initalization routine for a `Once` has panicked it will forever + /// indicate to future forced initialization routines that it is poisoned. + pub fn poisoned(&self) -> bool { + self.0 + } +} + +/// A synchronization primitive which can be used to run a one-time +/// initialization. Useful for one-time initialization for globals, FFI or +/// related functionality. +/// +/// # Differences from the standard library `Once` +/// +/// - Only requires 1 byte of space, instead of 1 word. +/// - Not required to be `'static`. +/// - Relaxed memory barriers in the fast path, which can significantly improve +/// performance on some architectures. +/// - Efficient handling of micro-contention using adaptive spinning. +/// +/// # Examples +/// +/// ``` +/// # #![feature(const_fn)] +/// use parking_lot::Once; +/// +/// static START: Once = Once::new(); +/// +/// START.call_once(|| { +/// // run initialization here +/// }); +/// ``` +pub struct Once(AtomicU8); + +impl Once { + /// Creates a new `Once` value. + #[inline] + pub const fn new() -> Once { + Once(AtomicU8::new(0)) + } + + /// Performs an initialization routine once and only once. The given closure + /// will be executed if this is the first time `call_once` has been called, + /// and otherwise the routine will *not* be invoked. + /// + /// This method will block the calling thread if another initialization + /// routine is currently running. + /// + /// When this function returns, it is guaranteed that some initialization + /// has run and completed (it may not be the closure specified). It is also + /// guaranteed that any memory writes performed by the executed closure can + /// be reliably observed by other threads at this point (there is a + /// happens-before relation between the closure and code executing after the + /// return). + /// + /// # Examples + /// + /// ``` + /// # #![feature(const_fn)] + /// use parking_lot::Once; + /// + /// static mut VAL: usize = 0; + /// static INIT: Once = Once::new(); + /// + /// // Accessing a `static mut` is unsafe much of the time, but if we do so + /// // in a synchronized fashion (e.g. write once or read all) then we're + /// // good to go! + /// // + /// // This function will only call `expensive_computation` once, and will + /// // otherwise always return the value returned from the first invocation. + /// fn get_cached_val() -> usize { + /// unsafe { + /// INIT.call_once(|| { + /// VAL = expensive_computation(); + /// }); + /// VAL + /// } + /// } + /// + /// fn expensive_computation() -> usize { + /// // ... + /// # 2 + /// } + /// ``` + /// + /// # Panics + /// + /// The closure `f` will only be executed once if this is called + /// concurrently amongst many threads. If that closure panics, however, then + /// it will *poison* this `Once` instance, causing all future invocations of + /// `call_once` to also panic. + #[inline] + pub fn call_once(&self, f: F) + where F: FnOnce() + { + if self.0.load(Ordering::Acquire) == DONE_BIT { + return; + } + + let mut f = Some(f); + self.call_once_slow(false, &mut |_| f.take().unwrap()()); + } + + /// Performs the same function as `call_once` except ignores poisoning. + /// + /// If this `Once` has been poisoned (some initialization panicked) then + /// this function will continue to attempt to call initialization functions + /// until one of them doesn't panic. + /// + /// The closure `f` is yielded a structure which can be used to query the + /// state of this `Once` (whether initialization has previously panicked or + /// not). + #[inline] + pub fn call_once_force(&self, f: F) + where F: FnOnce(OnceState) + { + if self.0.load(Ordering::Acquire) == DONE_BIT { + return; + } + + let mut f = Some(f); + self.call_once_slow(true, &mut |state| f.take().unwrap()(state)); + } + + // This is a non-generic function to reduce the monomorphization cost of + // using `call_once` (this isn't exactly a trivial or small implementation). + // + // Additionally, this is tagged with `#[cold]` as it should indeed be cold + // and it helps let LLVM know that calls to this function should be off the + // fast path. Essentially, this should help generate more straight line code + // in LLVM. + // + // Finally, this takes an `FnMut` instead of a `FnOnce` because there's + // currently no way to take an `FnOnce` and call it via virtual dispatch + // without some allocation overhead. + #[cold] + #[inline(never)] + fn call_once_slow(&self, ignore_poison: bool, f: &mut FnMut(OnceState)) { + let mut spin_count = 0; + let mut state = self.0.load(Ordering::Relaxed); + loop { + // If another thread called the closure, we're done + if state & DONE_BIT != 0 { + // An acquire fence is needed here since we didn't load the + // state with Ordering::Acquire. + fence(Ordering::Acquire); + return; + } + + // If the state has been poisoned and we aren't forcing, then panic + if state & POISON_BIT != 0 && !ignore_poison { + // Need the fence here as well for the same reason + fence(Ordering::Acquire); + panic!("Once instance has previously been poisoned"); + } + + // Grab the lock if it isn't locked, even if there is a queue on it. + // We also clear the poison bit since we are going to try running + // the closure again. + if state & LOCKED_BIT == 0 { + match self.0 + .compare_exchange_weak(state, + (state | LOCKED_BIT) & !POISON_BIT, + Ordering::Acquire, + Ordering::Relaxed) { + Ok(_) => break, + Err(x) => state = x, + } + continue; + } + + // If there is no queue, try spinning a few times + if state & PARKED_BIT == 0 && spin_count < SPIN_LIMIT { + spin_count += 1; + thread::yield_now(); + state = self.0.load(Ordering::Relaxed); + continue; + } + + // Set the parked bit + if state & PARKED_BIT == 0 { + if let Err(x) = self.0.compare_exchange_weak(state, + state | PARKED_BIT, + Ordering::Relaxed, + Ordering::Relaxed) { + state = x; + continue; + } + } + + // Park our thread until we are woken up by the thread that owns the + // lock. + unsafe { + let addr = self as *const _ as usize; + let validate = &mut || self.0.load(Ordering::Relaxed) == LOCKED_BIT | PARKED_BIT; + parking_lot::park(addr, validate, &mut || {}, None); + } + + // Loop back and check if the done bit was set + } + + struct PanicGuard<'a>(&'a Once); + impl<'a> Drop for PanicGuard<'a> { + fn drop(&mut self) { + // Mark the state as poisoned, unlock it and unpark all threads. + let once = self.0; + let state = once.0.swap(POISON_BIT, Ordering::Release); + if state & PARKED_BIT != 0 { + unsafe { + let addr = once as *const _ as usize; + parking_lot::unpark_all(addr); + } + } + } + } + + // At this point we have the lock, so run the closure. Make sure we + // properly clean up if the closure panicks. + let guard = PanicGuard(self); + f(OnceState(state & POISON_BIT != 0)); + mem::forget(guard); + + // Now unlock the state, set the done bit and unpark all threads + let state = self.0.swap(DONE_BIT, Ordering::Release); + if state & PARKED_BIT != 0 { + unsafe { + let addr = self as *const _ as usize; + parking_lot::unpark_all(addr); + } + } + } +} + +impl Default for Once { + fn default() -> Once { + Once::new() + } +} + +#[cfg(test)] +mod tests { + use std::panic; + use std::sync::mpsc::channel; + use std::thread; + use Once; + + #[test] + fn smoke_once() { + static O: Once = Once::new(); + let mut a = 0; + O.call_once(|| a += 1); + assert_eq!(a, 1); + O.call_once(|| a += 1); + assert_eq!(a, 1); + } + + #[test] + fn stampede_once() { + static O: Once = Once::new(); + static mut run: bool = false; + + let (tx, rx) = channel(); + for _ in 0..10 { + let tx = tx.clone(); + thread::spawn(move || { + for _ in 0..4 { + thread::yield_now() + } + unsafe { + O.call_once(|| { + assert!(!run); + run = true; + }); + assert!(run); + } + tx.send(()).unwrap(); + }); + } + + unsafe { + O.call_once(|| { + assert!(!run); + run = true; + }); + assert!(run); + } + + for _ in 0..10 { + rx.recv().unwrap(); + } + } + + #[test] + fn poison_bad() { + static O: Once = Once::new(); + + // poison the once + let t = panic::catch_unwind(|| { + O.call_once(|| panic!()); + }); + assert!(t.is_err()); + + // poisoning propagates + let t = panic::catch_unwind(|| { + O.call_once(|| {}); + }); + assert!(t.is_err()); + + // we can subvert poisoning, however + let mut called = false; + O.call_once_force(|p| { + called = true; + assert!(p.poisoned()) + }); + assert!(called); + + // once any success happens, we stop propagating the poison + O.call_once(|| {}); + } + + #[test] + fn wait_for_force_to_finish() { + static O: Once = Once::new(); + + // poison the once + let t = panic::catch_unwind(|| { + O.call_once(|| panic!()); + }); + assert!(t.is_err()); + + // make sure someone's waiting inside the once via a force + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + let t1 = thread::spawn(move || { + O.call_once_force(|p| { + assert!(p.poisoned()); + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + }); + }); + + rx1.recv().unwrap(); + + // put another waiter on the once + let t2 = thread::spawn(|| { + let mut called = false; + O.call_once(|| { + called = true; + }); + assert!(!called); + }); + + tx2.send(()).unwrap(); + + assert!(t1.join().is_ok()); + assert!(t2.join().is_ok()); + + } +} diff --git a/src/parking_lot.rs b/src/parking_lot.rs new file mode 100644 index 0000000..1688042 --- /dev/null +++ b/src/parking_lot.rs @@ -0,0 +1,469 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use std::sync::atomic::{AtomicUsize, AtomicPtr, Ordering}; +use std::time::Instant; +use std::cell::Cell; +use std::ptr; +use std::mem; +use thread_parker::ThreadParker; +use word_lock::WordLock; + +static NUM_THREADS: AtomicUsize = AtomicUsize::new(0); +static HASHTABLE: AtomicPtr = AtomicPtr::new(ptr::null_mut()); +thread_local!(static THREAD_DATA: ThreadData = ThreadData::new()); + +// Even with 3x more buckets than threads, the memory overhead per thread is +// still only a few hundred bytes per thread. +const LOAD_FACTOR: usize = 3; + +struct HashTable { + // Hash buckets for the table + entries: Box<[Bucket]>, + + // Number of bits used for the hash function + hash_bits: u32, + + // Previous table. This is only kept to keep leak detectors happy. + _prev: *const HashTable, +} + +impl HashTable { + fn new(num_threads: usize, prev: *const HashTable) -> Box { + let new_size = (num_threads * LOAD_FACTOR).next_power_of_two(); + let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1; + let bucket = Bucket { + mutex: WordLock::new(), + queue_head: Cell::new(ptr::null()), + queue_tail: Cell::new(ptr::null()), + _padding: unsafe { mem::uninitialized() }, + }; + Box::new(HashTable { + entries: vec![bucket; new_size].into_boxed_slice(), + hash_bits: hash_bits, + _prev: prev, + }) + } +} + +struct Bucket { + // Lock protecting the queue + mutex: WordLock, + + // Linked list of threads waiting on this bucket + queue_head: Cell<*const ThreadData>, + queue_tail: Cell<*const ThreadData>, + + // Padding to avoid false sharing between buckets. Ideally we would just + // align the bucket structure to 64 bytes, but Rust doesn't support that yet. + _padding: [u8; 64], +} + +// Implementation of Clone for Bucket, needed to make vec![] work +impl Clone for Bucket { + fn clone(&self) -> Bucket { + Bucket { + mutex: WordLock::new(), + queue_head: Cell::new(ptr::null()), + queue_tail: Cell::new(ptr::null()), + _padding: unsafe { mem::uninitialized() }, + } + } +} + +struct ThreadData { + parker: ThreadParker, + key: Cell, + next_in_queue: Cell<*const ThreadData>, +} + +impl ThreadData { + fn new() -> ThreadData { + // Keep track of the total number of live ThreadData objects and resize + // the hash table accordingly. + let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1; + unsafe { + grow_hashtable(num_threads); + } + + ThreadData { + parker: ThreadParker::new(), + key: Cell::new(0), + next_in_queue: Cell::new(ptr::null()), + } + } +} + +impl Drop for ThreadData { + fn drop(&mut self) { + NUM_THREADS.fetch_sub(1, Ordering::Relaxed); + } +} + +// Grow the hash table so that it is big enough for the given number of threads. +// This isn't performance-critical since it is only done when a ThreadData is +// created, which only happens once per thread. +unsafe fn grow_hashtable(num_threads: usize) { + // If there is no table, create one + if HASHTABLE.load(Ordering::Relaxed).is_null() { + let new_table = Box::into_raw(HashTable::new(num_threads, ptr::null())); + + // If this fails then it means some other thread created the hash + // table first. + if HASHTABLE.compare_exchange(ptr::null_mut(), + new_table, + Ordering::Release, + Ordering::Relaxed) + .is_ok() { + return; + } + + // Free the table we created + Box::from_raw(new_table); + } + + let mut old_table; + loop { + old_table = HASHTABLE.load(Ordering::Acquire); + + // Check if we need to resize the existing table + if (*old_table).entries.len() >= LOAD_FACTOR * num_threads { + return; + } + + // Lock all buckets in the old table + for b in &(*old_table).entries[..] { + b.mutex.lock(); + } + + // Now check if our table is still the latest one. Another thread could + // have grown the hash table between us reading HASHTABLE and locking + // the buckets. + if HASHTABLE.load(Ordering::Relaxed) == old_table { + break; + } + + // Unlock buckets and try again + for b in &(*old_table).entries[..] { + b.mutex.unlock(); + } + } + + // Create the new table + let new_table = HashTable::new(num_threads, old_table); + + // Move the entries from the old table to the new one + for b in &(*old_table).entries[..] { + let mut current = b.queue_head.get(); + while !current.is_null() { + let next = (*current).next_in_queue.get(); + let hash = hash((*current).key.get(), new_table.hash_bits); + if new_table.entries[hash].queue_tail.get().is_null() { + new_table.entries[hash].queue_head.set(current); + } else { + (*new_table.entries[hash].queue_tail.get()).next_in_queue.set(current); + } + new_table.entries[hash].queue_tail.set(current); + (*current).next_in_queue.set(ptr::null()); + current = next; + } + } + + // Publish the new table. No races are possible at this point because + // any other thread trying to grow the hash table is blocked on the bucket + // locks in the old table. + HASHTABLE.store(Box::into_raw(new_table), Ordering::Release); + + // Unlock all buckets in the old table + for b in &(*old_table).entries[..] { + b.mutex.unlock(); + } +} + +// Hash function for addresses +#[cfg(target_pointer_width = "32")] +fn hash(key: usize, bits: u32) -> usize { + key.wrapping_mul(0x9E3779B9) >> (32 - bits) +} +#[cfg(target_pointer_width = "64")] +fn hash(key: usize, bits: u32) -> usize { + key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits) +} + +// Lock the bucket for the given key +unsafe fn lock_bucket<'a>(key: usize) -> Option<&'a Bucket> { + let mut bucket; + loop { + let hashtable = HASHTABLE.load(Ordering::Acquire); + + // If there is no hash table then there is no bucket to lock + if hashtable.is_null() { + return None; + } + + let hash = hash(key, (*hashtable).hash_bits); + bucket = &(*hashtable).entries[hash]; + + // Lock the bucket + bucket.mutex.lock(); + + // If no other thread has rehashed the table before we grabbed the lock + // then we are good to go! The lock we grabbed prevents any rehashes. + if HASHTABLE.load(Ordering::Relaxed) == hashtable { + return Some(bucket); + } + + // Unlock the bucket and try again + bucket.mutex.unlock(); + } +} + +/// Result of an `unpark_one` operation. +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub enum UnparkResult { + /// No parked threads were found for the given key. + NoParkedThreads, + + /// One thread was unparked and it was one in the queue. + UnparkedLast, + + /// One thread was unparked but there are more in the queue. + UnparkedNotLast, +} + +/// Parks the current thread in the queue associated with the given key. +/// +/// The `validate` function is called while the queue is locked and can abort +/// the operation by returning false. If `validate` returns true then the +/// current thread is appended to the queue and the queue is unlocked. +/// +/// The `before_sleep` function is called after the queue is unlocked but before +/// the thread is put to sleep. The thread will then sleep until it is unparked +/// or the given timeout is reached. +/// +/// This function returns `true` if the thread was unparked by a call to +/// `unpark_one` or `unpark_all`, and `false` if the validation function failed +/// or the timeout was reached. +/// +/// # Safety +/// +/// You should only call this function with an address that you control, since +/// you could otherwise interfere with the operation of other synchronization +/// primitives. +/// +/// The `validate` function is called while the queue is locked and must not +/// panic or call into any function in `parking_lot`. +/// +/// The `before_sleep` function is called outside the queue lock and is allowed +/// to call `unpark_one` or `unpark_all`, but it is not allowed to call `park` +/// or panic. +pub unsafe fn park(key: usize, + validate: &mut FnMut() -> bool, + before_sleep: &mut FnMut(), + timeout: Option) + -> bool { + // Grab our thread data, this also ensures that the hash table exists + THREAD_DATA.with(|thread_data| { + // Lock the bucket for the given key + let bucket = lock_bucket(key).unwrap(); + + // If the validation function fails, just return + if !validate() { + bucket.mutex.unlock(); + return false; + } + + // Append our thread data to the queue and unlock the bucket + thread_data.next_in_queue.set(ptr::null()); + thread_data.key.set(key); + thread_data.parker.prepare_park(); + if !bucket.queue_head.get().is_null() { + (*bucket.queue_tail.get()).next_in_queue.set(thread_data); + } else { + bucket.queue_head.set(thread_data); + } + bucket.queue_tail.set(thread_data); + bucket.mutex.unlock(); + + // Invoke the pre-sleep callback + before_sleep(); + + // Park our thread and determine whether we were woken up by an unpark + // or by our timeout. Note that this isn't precise: we can still be + // unparked since we are still in the queue. + let unparked = match timeout { + Some(timeout) => thread_data.parker.park_until(timeout), + None => { + thread_data.parker.park(); + true + } + }; + + // If we were unparked, return now + if unparked { + return true; + } + + // Lock our bucket again. Note that the hashtable may have been rehashed + // in the meantime. + let bucket = lock_bucket(key).unwrap(); + + // Now we need to check again if we were unparked or timed out. Unlike + // the last check this is precise because we hold the bucket lock. + if !thread_data.parker.timed_out() { + bucket.mutex.unlock(); + return true; + } + + // We timed out, so we now need to remove our thread from the queue + let mut link = &bucket.queue_head; + let mut current = bucket.queue_head.get(); + let mut previous = ptr::null(); + while !current.is_null() { + if current == thread_data { + let next = (*current).next_in_queue.get(); + link.set(next); + if bucket.queue_tail.get() == current { + bucket.queue_tail.set(previous); + } + break; + } else { + link = &(*current).next_in_queue; + previous = current; + current = link.get(); + } + } + + // Unlock the bucket, we are done + bucket.mutex.unlock(); + false + }) +} + +/// Unparks one thread from the queue associated with the given key. +/// +/// The `callback` function is called while the queue is locked and before the +/// target thread is woken up. The `UnparkResult` argument to the function +/// indicates whether a thread was found in the queue and whether this was the +/// last thread in the queue. This value is also returned by `unpark_one`. +/// +/// # Safety +/// +/// You should only call this function with an address that you control, since +/// you could otherwise interfere with the operation of other synchronization +/// primitives. +/// +/// The `callback` function is called while the queue is locked and must not +/// panic or call into any function in `parking_lot`. +pub unsafe fn unpark_one(key: usize, callback: &mut FnMut(UnparkResult)) -> UnparkResult { + // Lock the bucket for the given key + let bucket = match lock_bucket(key) { + Some(x) => x, + None => { + // If there is no hash table then there is nothing to unpark + callback(UnparkResult::NoParkedThreads); + return UnparkResult::NoParkedThreads; + } + }; + + // Find a thread with a matching key and remove it from the queue + let mut link = &bucket.queue_head; + let mut current = bucket.queue_head.get(); + let mut previous = ptr::null(); + while !current.is_null() { + if (*current).key.get() == key { + // Remove the thread from the queue + let next = (*current).next_in_queue.get(); + link.set(next); + let mut result = UnparkResult::UnparkedLast; + if bucket.queue_tail.get() == current { + bucket.queue_tail.set(previous); + } else { + // Scan the rest of the queue to see if there are any other + // entries with the given key. + let mut scan = next; + while !scan.is_null() { + if (*scan).key.get() == key { + result = UnparkResult::UnparkedNotLast; + break; + } + scan = (*scan).next_in_queue.get(); + } + } + + // Invoke the callback before waking up the thread + callback(result); + + // Unpark the thread while holding the bucket lock to avoid race + // conditions with timeouts. Once unparked, the thread will act as + // if it was woken up by an unpark even if it reached its timeout. + (*current).parker.unpark(); + bucket.mutex.unlock(); + return result; + } else { + link = &(*current).next_in_queue; + previous = current; + current = link.get(); + } + } + + // No threads with a matching key were found in the bucket + callback(UnparkResult::NoParkedThreads); + bucket.mutex.unlock(); + UnparkResult::NoParkedThreads +} + +/// Unparks all threads in the queue associated with the given key. +/// +/// This function returns the number of threads that were unparked. +/// +/// # Safety +/// +/// You should only call this function with an address that you control, since +/// you could otherwise interfere with the operation of other synchronization +/// primitives. +pub unsafe fn unpark_all(key: usize) -> usize { + // Lock the bucket for the given key + let bucket = match lock_bucket(key) { + Some(x) => x, + // If there is no hash table then there is nothing to unpark + None => return 0, + }; + + // Remove all threads with the given key in the bucket + let mut link = &bucket.queue_head; + let mut current = bucket.queue_head.get(); + let mut previous = ptr::null(); + let mut num_threads = 0; + while !current.is_null() { + if (*current).key.get() == key { + // Remove the thread from the queue + let next = (*current).next_in_queue.get(); + link.set(next); + if bucket.queue_tail.get() == current { + bucket.queue_tail.set(previous); + } + + // Unpark the thread while holding the bucket lock to avoid race + // conditions with timeouts. Once unparked, the thread will act as + // if it was woken up by an unpark even if it reached its timeout. + (*current).parker.unpark(); + + num_threads += 1; + current = next; + } else { + link = &(*current).next_in_queue; + previous = current; + current = link.get(); + } + } + + // Unlock the bucket + bucket.mutex.unlock(); + + num_threads +} diff --git a/src/raw_mutex.rs b/src/raw_mutex.rs new file mode 100644 index 0000000..3afc8de --- /dev/null +++ b/src/raw_mutex.rs @@ -0,0 +1,146 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use std::sync::atomic::{AtomicU8, Ordering}; +use std::thread; +use parking_lot::{self, UnparkResult}; +use SPIN_LIMIT; + +const LOCKED_BIT: u8 = 1; +const PARKED_BIT: u8 = 2; + +pub struct RawMutex { + state: AtomicU8, +} + +impl RawMutex { + #[inline] + pub const fn new() -> RawMutex { + RawMutex { state: AtomicU8::new(0) } + } + + #[inline] + pub fn lock(&self) { + if self.state + .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed) + .is_ok() { + return; + } + self.lock_slow(); + } + + #[inline] + pub fn try_lock(&self) -> bool { + let mut state = self.state.load(Ordering::Relaxed); + loop { + if state & LOCKED_BIT != 0 { + return false; + } + match self.state.compare_exchange_weak(state, + state | LOCKED_BIT, + Ordering::Acquire, + Ordering::Relaxed) { + Ok(_) => return true, + Err(x) => state = x, + } + } + } + + #[inline] + pub fn unlock(&self) { + if self.state + .compare_exchange_weak(LOCKED_BIT, 0, Ordering::Release, Ordering::Relaxed) + .is_ok() { + return; + } + self.unlock_slow(); + } + + #[cold] + #[inline(never)] + fn lock_slow(&self) { + let mut spin_count = 0; + let mut state = self.state.load(Ordering::Relaxed); + loop { + // Grab the lock if it isn't locked, even if there is a queue on it + if state & LOCKED_BIT == 0 { + match self.state + .compare_exchange_weak(state, + state | LOCKED_BIT, + Ordering::Acquire, + Ordering::Relaxed) { + Ok(_) => return, + Err(x) => state = x, + } + continue; + } + + // If there is no queue, try spinning a few times + if state & PARKED_BIT == 0 && spin_count < SPIN_LIMIT { + spin_count += 1; + thread::yield_now(); + state = self.state.load(Ordering::Relaxed); + continue; + } + + // Set the parked bit + if state & PARKED_BIT == 0 { + if let Err(x) = self.state.compare_exchange_weak(state, + state | PARKED_BIT, + Ordering::Relaxed, + Ordering::Relaxed) { + state = x; + continue; + } + } + + // Park our thread until we are woken up by an unlock + unsafe { + let addr = self as *const _ as usize; + let validate = &mut || { + self.state.load(Ordering::Relaxed) == LOCKED_BIT | PARKED_BIT + }; + parking_lot::park(addr, validate, &mut || {}, None); + } + + // Loop back and try locking again + state = self.state.load(Ordering::Relaxed); + } + } + + #[cold] + #[inline(never)] + fn unlock_slow(&self) { + let mut state = self.state.load(Ordering::Relaxed); + loop { + // Unlock directly if there are no parked threads + if state & PARKED_BIT == 0 { + match self.state + .compare_exchange_weak(LOCKED_BIT, 0, Ordering::Release, Ordering::Relaxed) { + Ok(_) => return, + Err(x) => state = x, + } + continue; + } + + // Unpark one thread and leave the parked bit set if there might + // still be parked threads on this address. + unsafe { + let addr = self as *const _ as usize; + let callback = &mut |result| { + if result == UnparkResult::UnparkedNotLast { + self.state.store(PARKED_BIT, Ordering::Release); + } else { + self.state.store(0, Ordering::Release); + } + }; + parking_lot::unpark_one(addr, callback); + } + break; + } + } +} diff --git a/src/raw_rwlock.rs b/src/raw_rwlock.rs new file mode 100644 index 0000000..ba5d6e2 --- /dev/null +++ b/src/raw_rwlock.rs @@ -0,0 +1,333 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread; +use parking_lot::{self, UnparkResult}; +use SPIN_LIMIT; + +const SHARED_PARKED_BIT: usize = 1; +const EXCLUSIVE_PARKED_BIT: usize = 2; +const EXCLUSIVE_LOCKED_BIT: usize = 4; +const SHARED_COUNT_MASK: usize = !7; +const SHARED_COUNT_INC: usize = 8; + +pub struct RawRwLock { + state: AtomicUsize, +} + +impl RawRwLock { + #[inline] + pub const fn new() -> RawRwLock { + RawRwLock { state: AtomicUsize::new(0) } + } + + #[inline] + pub fn lock_exclusive(&self) { + if self.state + .compare_exchange_weak(0, + EXCLUSIVE_LOCKED_BIT, + Ordering::Acquire, + Ordering::Relaxed) + .is_ok() { + return; + } + self.lock_exclusive_slow(); + } + + #[inline] + pub fn try_lock_exclusive(&self) -> bool { + let mut state = self.state.load(Ordering::Relaxed); + loop { + if state & (EXCLUSIVE_LOCKED_BIT | SHARED_COUNT_MASK) != 0 { + return false; + } + match self.state.compare_exchange_weak(state, + state | EXCLUSIVE_LOCKED_BIT, + Ordering::Acquire, + Ordering::Relaxed) { + Ok(_) => return true, + Err(x) => state = x, + } + } + } + + #[inline] + pub fn unlock_exclusive(&self) { + if self.state + .compare_exchange_weak(EXCLUSIVE_LOCKED_BIT, + 0, + Ordering::Release, + Ordering::Relaxed) + .is_ok() { + return; + } + self.unlock_exclusive_slow(); + } + + #[inline] + pub fn lock_shared(&self) { + let state = self.state.load(Ordering::Relaxed); + if state & (EXCLUSIVE_LOCKED_BIT | EXCLUSIVE_PARKED_BIT) == 0 && + self.state + .compare_exchange_weak(state, + state.checked_add(SHARED_COUNT_INC) + .expect("RwLock shared count overflow"), + Ordering::Acquire, + Ordering::Relaxed) + .is_ok() { + return; + } + self.lock_shared_slow(); + } + + #[inline] + pub fn try_lock_shared(&self) -> bool { + let mut state = self.state.load(Ordering::Relaxed); + loop { + if state & (EXCLUSIVE_LOCKED_BIT | EXCLUSIVE_PARKED_BIT) != 0 { + return false; + } + match self.state.compare_exchange_weak(state, + state.checked_add(SHARED_COUNT_INC) + .expect("RwLock shared count overflow"), + Ordering::Acquire, + Ordering::Relaxed) { + Ok(_) => return true, + Err(x) => state = x, + } + } + } + + #[inline] + pub fn unlock_shared(&self) { + let state = self.state.load(Ordering::Relaxed); + if (state & EXCLUSIVE_PARKED_BIT == 0 || state & SHARED_COUNT_MASK != SHARED_COUNT_INC) && + self.state + .compare_exchange_weak(state, + state - SHARED_COUNT_INC, + Ordering::Release, + Ordering::Relaxed) + .is_ok() { + return; + } + self.unlock_shared_slow(); + } + + #[cold] + #[inline(never)] + pub fn lock_exclusive_slow(&self) { + let mut spin_count = 0; + let mut state = self.state.load(Ordering::Relaxed); + loop { + // Grab the lock if it isn't locked, even if there are other + // threads parked. + if state & (EXCLUSIVE_LOCKED_BIT | SHARED_COUNT_MASK) == 0 { + match self.state + .compare_exchange_weak(state, + state | EXCLUSIVE_LOCKED_BIT, + Ordering::Acquire, + Ordering::Relaxed) { + Ok(_) => return, + Err(x) => state = x, + } + continue; + } + + // If there are no parked exclusive threads, try spinning a few times + if state & EXCLUSIVE_PARKED_BIT == 0 && spin_count < SPIN_LIMIT { + spin_count += 1; + thread::yield_now(); + state = self.state.load(Ordering::Relaxed); + continue; + } + + // Set the parked bit + if state & EXCLUSIVE_PARKED_BIT == 0 { + if let Err(x) = self.state.compare_exchange_weak(state, + state | EXCLUSIVE_PARKED_BIT, + Ordering::Relaxed, + Ordering::Relaxed) { + state = x; + continue; + } + } + + // Park our thread until we are woken up by an unlock + unsafe { + let addr = self as *const _ as usize; + let validate = &mut || { + self.state.load(Ordering::Relaxed) & + (EXCLUSIVE_LOCKED_BIT | EXCLUSIVE_PARKED_BIT) == + EXCLUSIVE_LOCKED_BIT | EXCLUSIVE_PARKED_BIT + }; + if parking_lot::park(addr, validate, &mut || {}, None) { + // If we successfully parked then the lock will be handed + // off to us. + return; + } + } + + // Loop back and try locking again + state = self.state.load(Ordering::Relaxed); + } + } + + #[cold] + #[inline(never)] + pub fn unlock_exclusive_slow(&self) { + let mut state = self.state.load(Ordering::Relaxed); + loop { + // Unlock directly if there are no parked threads + if state & (EXCLUSIVE_PARKED_BIT | SHARED_PARKED_BIT) == 0 { + match self.state + .compare_exchange_weak(EXCLUSIVE_LOCKED_BIT, + 0, + Ordering::Release, + Ordering::Relaxed) { + Ok(_) => return, + Err(x) => state = x, + } + continue; + } + + // If there are exclusive parked threads, hand off the lock to one + // of them without unlocking it. This is needed to avoid writer + // starvation. + if state & EXCLUSIVE_PARKED_BIT != 0 { + unsafe { + let addr = self as *const _ as usize; + let callback = &mut |result| { + // Clear the exclusive parked bit if this was the last thread + if result != UnparkResult::UnparkedNotLast { + self.state.fetch_and(!EXCLUSIVE_PARKED_BIT, Ordering::Relaxed); + } + }; + if parking_lot::unpark_one(addr, callback) != UnparkResult::NoParkedThreads { + // We successfully self.state an exclusive thread and the lock + // has been handed off to it. + return; + } + } + } + + // Release the exclusive lock and clear the shared parked bit + if let Err(x) = self.state + .compare_exchange_weak(EXCLUSIVE_LOCKED_BIT | SHARED_PARKED_BIT, + 0, + Ordering::Release, + Ordering::Relaxed) { + state = x; + continue; + } + + // Unpark all waiting shared threads. + unsafe { + let addr = self as *const _ as usize; + parking_lot::unpark_all(addr + 1); + } + break; + } + } + + #[cold] + #[inline(never)] + pub fn lock_shared_slow(&self) { + let mut spin_count = 0; + let mut state = self.state.load(Ordering::Relaxed); + loop { + // Grab the lock if there are no exclusive threads locked or waiting + if state & (EXCLUSIVE_LOCKED_BIT | EXCLUSIVE_PARKED_BIT) == 0 { + match self.state + .compare_exchange_weak(state, + state.checked_add(SHARED_COUNT_INC) + .expect("RwLock shared count overflow"), + Ordering::Acquire, + Ordering::Relaxed) { + Ok(_) => return, + Err(x) => state = x, + } + continue; + } + + // If there are no parked exclusive threads, try spinning a few times + if state & EXCLUSIVE_PARKED_BIT == 0 && spin_count < SPIN_LIMIT { + spin_count += 1; + thread::yield_now(); + state = self.state.load(Ordering::Relaxed); + continue; + } + + // Set the shared parked bit + if state & SHARED_PARKED_BIT == 0 { + if let Err(x) = self.state.compare_exchange_weak(state, + state | SHARED_PARKED_BIT, + Ordering::Relaxed, + Ordering::Relaxed) { + state = x; + continue; + } + } + + // Park our thread until we are woken up by an unlock + unsafe { + let addr = self as *const _ as usize; + let validate = &mut || { + self.state.load(Ordering::Relaxed) & + (EXCLUSIVE_LOCKED_BIT | SHARED_PARKED_BIT) == + EXCLUSIVE_LOCKED_BIT | SHARED_PARKED_BIT + }; + parking_lot::park(addr + 1, validate, &mut || {}, None); + } + + // Loop back and try locking again + state = self.state.load(Ordering::Relaxed); + } + } + + #[cold] + #[inline(never)] + pub fn unlock_shared_slow(&self) { + let mut state = self.state.load(Ordering::Relaxed); + loop { + // Unlock directly if there are no parked threads or if there are + // still remaining shared threads + if state & EXCLUSIVE_PARKED_BIT == 0 || state & SHARED_COUNT_MASK != SHARED_COUNT_INC { + match self.state + .compare_exchange_weak(state, + state - SHARED_COUNT_INC, + Ordering::Release, + Ordering::Relaxed) { + Ok(_) => return, + Err(x) => state = x, + } + continue; + } + + // At this point we are the last shared thread and we need to wake + // up one exclusive thread to pass the lock to. + unsafe { + let addr = self as *const _ as usize; + let callback = &mut |result| { + // Clear the exclusive parked bit if this was the last thread + if result != UnparkResult::UnparkedNotLast { + self.state.fetch_and(!EXCLUSIVE_PARKED_BIT, Ordering::Relaxed); + } + }; + if parking_lot::unpark_one(addr, callback) != UnparkResult::NoParkedThreads { + // We successfully self.state an exclusive thread and the lock + // has been handed off to it. + return; + } + } + + // There was no exclusive thread to wake up, so just loop back again + // and try to release the lock normally. + state = self.state.load(Ordering::Relaxed); + } + } +} diff --git a/src/rwlock.rs b/src/rwlock.rs new file mode 100644 index 0000000..267e5d0 --- /dev/null +++ b/src/rwlock.rs @@ -0,0 +1,463 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use std::cell::UnsafeCell; +use std::ops::{Deref, DerefMut}; +use std::fmt; +use raw_rwlock::RawRwLock; + +/// A reader-writer lock +/// +/// This type of lock allows a number of readers or at most one writer at any +/// point in time. The write portion of this lock typically allows modification +/// of the underlying data (exclusive access) and the read portion of this lock +/// typically allows for read-only access (shared access). +/// +/// This lock will always prioritize writers over readers to avoid writer +/// starvation. This means that readers trying to acquire the lock will block +/// even if the lock is unlocked when there are writers waiting to acquire the +/// lock. +/// +/// The type parameter `T` represents the data that this lock protects. It is +/// required that `T` satisfies `Send` to be shared across threads and `Sync` to +/// allow concurrent access through readers. The RAII guards returned from the +/// locking methods implement `Deref` (and `DerefMut` for the `write` methods) +/// to allow access to the contained of the lock. +/// +/// # Differences from the standard library `RwLock` +/// +/// - Writer-preferred policy instead of an unspecified platform default. +/// - No poisoning, the lock is released normally on panic. +/// - Only requires 1 word of space, whereas the standard library boxes the +/// `RwLock` due to platform limitations. +/// - A lock guard can be sent to another thread and unlocked there. +/// - Can be statically constructed (requires the `const_fn` nightly feature). +/// - Does not require any drop glue when dropped. +/// - Inline fast path for the uncontended case. +/// - Efficient handling of micro-contention using adaptive spinning. +/// +/// # Examples +/// +/// ``` +/// use parking_lot::RwLock; +/// +/// let lock = RwLock::new(5); +/// +/// // many reader locks can be held at once +/// { +/// let r1 = lock.read(); +/// let r2 = lock.read(); +/// assert_eq!(*r1, 5); +/// assert_eq!(*r2, 5); +/// } // read locks are dropped at this point +/// +/// // only one write lock may be held, however +/// { +/// let mut w = lock.write(); +/// *w += 1; +/// assert_eq!(*w, 6); +/// } // write lock is dropped here +/// ``` +pub struct RwLock { + rwlock: RawRwLock, + data: UnsafeCell, +} + +unsafe impl Send for RwLock {} +unsafe impl Sync for RwLock {} + +/// RAII structure used to release the shared read access of a lock when +/// dropped. +#[must_use] +pub struct RwLockReadGuard<'a, T: ?Sized + 'a> { + rwlock: &'a RawRwLock, + data: &'a T, +} + +/// RAII structure used to release the exclusive write access of a lock when +/// dropped. +#[must_use] +pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> { + rwlock: &'a RawRwLock, + data: &'a mut T, +} + +impl RwLock { + /// Creates a new instance of an `RwLock` which is unlocked. + /// + /// # Examples + /// + /// ``` + /// use parking_lot::RwLock; + /// + /// let lock = RwLock::new(5); + /// ``` + pub const fn new(val: T) -> RwLock { + RwLock { + data: UnsafeCell::new(val), + rwlock: RawRwLock::new(), + } + } + + /// Consumes this `RwLock`, returning the underlying data. + pub fn into_inner(self) -> T { + unsafe { self.data.into_inner() } + } +} + +impl RwLock { + /// Locks this rwlock with shared read access, blocking the current thread + /// until it can be acquired. + /// + /// The calling thread will be blocked until there are no more writers which + /// hold the lock. There may be other readers currently inside the lock when + /// this method returns. This method does not provide any guarantees with + /// respect to the ordering of whether contentious readers or writers will + /// acquire the lock first. + /// + /// Returns an RAII guard which will release this thread's shared access + /// once it is dropped. + pub fn read(&self) -> RwLockReadGuard { + self.rwlock.lock_shared(); + RwLockReadGuard { + rwlock: &self.rwlock, + data: unsafe { &*self.data.get() }, + } + } + + /// Attempts to acquire this rwlock with shared read access. + /// + /// If the access could not be granted at this time, then `Err` is returned. + /// Otherwise, an RAII guard is returned which will release the shared access + /// when it is dropped. + /// + /// This function does not block. + /// + /// This function does not provide any guarantees with respect to the ordering + /// of whether contentious readers or writers will acquire the lock first. + pub fn try_read(&self) -> Option> { + if self.rwlock.try_lock_shared() { + Some(RwLockReadGuard { + rwlock: &self.rwlock, + data: unsafe { &*self.data.get() }, + }) + } else { + None + } + } + + /// Locks this rwlock with exclusive write access, blocking the current + /// thread until it can be acquired. + /// + /// This function will not return while other writers or other readers + /// currently have access to the lock. + /// + /// Returns an RAII guard which will drop the write access of this rwlock + /// when dropped. + pub fn write(&self) -> RwLockWriteGuard { + self.rwlock.lock_exclusive(); + RwLockWriteGuard { + rwlock: &self.rwlock, + data: unsafe { &mut *self.data.get() }, + } + } + + /// Attempts to lock this rwlock with exclusive write access. + /// + /// If the lock could not be acquired at this time, then `Err` is returned. + /// Otherwise, an RAII guard is returned which will release the lock when + /// it is dropped. + /// + /// This function does not block. + /// + /// This function does not provide any guarantees with respect to the ordering + /// of whether contentious readers or writers will acquire the lock first. + pub fn try_write(&self) -> Option> { + if self.rwlock.try_lock_exclusive() { + Some(RwLockWriteGuard { + rwlock: &self.rwlock, + data: unsafe { &mut *self.data.get() }, + }) + } else { + None + } + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the `RwLock` mutably, no actual locking needs to + /// take place---the mutable borrow statically guarantees no locks exist. + pub fn get_mut(&mut self) -> &mut T { + unsafe { &mut *self.data.get() } + } +} + +impl Default for RwLock { + fn default() -> RwLock { + RwLock::new(Default::default()) + } +} + +impl fmt::Debug for RwLock { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.try_read() { + Some(guard) => write!(f, "RwLock {{ data: {:?} }}", &*guard), + None => write!(f, "RwLock {{ }}"), + } + } +} + +impl<'a, T: ?Sized + 'a> Deref for RwLockReadGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + self.data + } +} + +impl<'a, T: ?Sized + 'a> Drop for RwLockReadGuard<'a, T> { + fn drop(&mut self) { + self.rwlock.unlock_shared(); + } +} + +impl<'a, T: ?Sized + 'a> Deref for RwLockWriteGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + self.data + } +} + +impl<'a, T: ?Sized + 'a> DerefMut for RwLockWriteGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + self.data + } +} + +impl<'a, T: ?Sized + 'a> Drop for RwLockWriteGuard<'a, T> { + fn drop(&mut self) { + self.rwlock.unlock_exclusive(); + } +} + +#[cfg(test)] +mod tests { + extern crate rand; + use self::rand::Rng; + use std::sync::mpsc::channel; + use std::thread; + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + use RwLock; + + #[derive(Eq, PartialEq, Debug)] + struct NonCopy(i32); + + #[test] + fn smoke() { + let l = RwLock::new(()); + drop(l.read()); + drop(l.write()); + drop((l.read(), l.read())); + drop(l.write()); + } + + #[test] + fn frob() { + static R: RwLock<()> = RwLock::new(()); + const N: u32 = 10; + const M: u32 = 1000; + + let (tx, rx) = channel::<()>(); + for _ in 0..N { + let tx = tx.clone(); + thread::spawn(move || { + let mut rng = rand::thread_rng(); + for _ in 0..M { + if rng.gen_weighted_bool(N) { + drop(R.write()); + } else { + drop(R.read()); + } + } + drop(tx); + }); + } + drop(tx); + let _ = rx.recv(); + } + + #[test] + fn test_rw_arc_no_poison_wr() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.write(); + panic!(); + }) + .join(); + let lock = arc.read(); + assert_eq!(*lock, 1); + } + + #[test] + fn test_rw_arc_no_poison_ww() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.write(); + panic!(); + }) + .join(); + let lock = arc.write(); + assert_eq!(*lock, 1); + } + + #[test] + fn test_rw_arc_no_poison_rr() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.read(); + panic!(); + }) + .join(); + let lock = arc.read(); + assert_eq!(*lock, 1); + } + #[test] + fn test_rw_arc_no_poison_rw() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.read(); + panic!() + }) + .join(); + let lock = arc.write(); + assert_eq!(*lock, 1); + } + + #[test] + fn test_rw_arc() { + let arc = Arc::new(RwLock::new(0)); + let arc2 = arc.clone(); + let (tx, rx) = channel(); + + thread::spawn(move || { + let mut lock = arc2.write(); + for _ in 0..10 { + let tmp = *lock; + *lock = -1; + thread::yield_now(); + *lock = tmp + 1; + } + tx.send(()).unwrap(); + }); + + // Readers try to catch the writer in the act + let mut children = Vec::new(); + for _ in 0..5 { + let arc3 = arc.clone(); + children.push(thread::spawn(move || { + let lock = arc3.read(); + assert!(*lock >= 0); + })); + } + + // Wait for children to pass their asserts + for r in children { + assert!(r.join().is_ok()); + } + + // Wait for writer to finish + rx.recv().unwrap(); + let lock = arc.read(); + assert_eq!(*lock, 10); + } + + #[test] + fn test_rw_arc_access_in_unwind() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || -> () { + struct Unwinder { + i: Arc>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + let mut lock = self.i.write(); + *lock += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.read(); + assert_eq!(*lock, 2); + } + + #[test] + fn test_rwlock_unsized() { + let rw: &RwLock<[i32]> = &RwLock::new([1, 2, 3]); + { + let b = &mut *rw.write(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*rw.read(), comp); + } + + #[test] + fn test_rwlock_try_write() { + let lock = RwLock::new(0isize); + let read_guard = lock.read(); + + let write_result = lock.try_write(); + match write_result { + None => (), + Some(_) => { + assert!(false, + "try_write should not succeed while read_guard is in scope") + } + } + + drop(read_guard); + } + + #[test] + fn test_into_inner() { + let m = RwLock::new(NonCopy(10)); + assert_eq!(m.into_inner(), NonCopy(10)); + } + + #[test] + fn test_into_inner_drop() { + struct Foo(Arc); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = RwLock::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_get_mut() { + let mut m = RwLock::new(NonCopy(10)); + *m.get_mut() = NonCopy(20); + assert_eq!(m.into_inner(), NonCopy(20)); + } +} diff --git a/src/thread_parker.rs b/src/thread_parker.rs new file mode 100644 index 0000000..79c021e --- /dev/null +++ b/src/thread_parker.rs @@ -0,0 +1,74 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use std::sync::{Mutex, Condvar}; +use std::cell::Cell; +use std::time::Instant; + +// Helper type for putting a thread to sleep until some other thread wakes it up +pub struct ThreadParker { + should_park: Cell, + mutex: Mutex<()>, + condvar: Condvar, +} + +impl ThreadParker { + pub fn new() -> ThreadParker { + ThreadParker { + should_park: Cell::new(false), + mutex: Mutex::new(()), + condvar: Condvar::new(), + } + } + + // Prepares the parker. This should be called before adding it to the queue. + pub fn prepare_park(&self) { + self.should_park.set(true); + } + + // Checks if the park timed out. This should be called while holding the + // queue lock after park_until has returned false. + pub fn timed_out(&self) -> bool { + self.should_park.get() + } + + // Parks the thread until it is unparked. This should be called after it has + // been added to the queue, after unlocking the queue. + pub fn park(&self) { + let mut lock = self.mutex.lock().unwrap(); + while self.should_park.get() { + lock = self.condvar.wait(lock).unwrap(); + } + } + + // Parks the thread until it is unparked or the timeout is reached. This + // should be called after it has been added to the queue, after unlocking + // the queue. Returns true if we were unparked and false if we timed out. + pub fn park_until(&self, timeout: Instant) -> bool { + let mut lock = self.mutex.lock().unwrap(); + while self.should_park.get() { + let now = Instant::now(); + if timeout <= now { + return false; + } + let (new_lock, _) = self.condvar.wait_timeout(lock, timeout - now).unwrap(); + lock = new_lock; + } + true + } + + // Wakes up the parker. This should be called while holding the queue lock. + pub fn unpark(&self) { + let _lock = self.mutex.lock().unwrap(); + self.should_park.set(false); + + // We notify while holding the lock here to avoid races with the target + // thread. In particular, the thread could exit after we unlock the + // mutex, which would make the condvar access invalid memory. + self.condvar.notify_one(); + } +} diff --git a/src/word_lock.rs b/src/word_lock.rs new file mode 100644 index 0000000..9b6eebb --- /dev/null +++ b/src/word_lock.rs @@ -0,0 +1,187 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread; +use std::ptr; +use std::mem; +use std::cell::Cell; +use thread_parker::ThreadParker; +use SPIN_LIMIT; + +struct ThreadData { + parker: ThreadParker, + next_in_queue: Cell<*const ThreadData>, + + // To make everything fit in 1 word we cheat by putting the tail pointer of + // the linked list in the first element of the queue. + queue_tail: Cell<*const ThreadData>, +} + +impl ThreadData { + fn new() -> ThreadData { + ThreadData { + parker: ThreadParker::new(), + next_in_queue: Cell::new(ptr::null()), + queue_tail: Cell::new(ptr::null()), + } + } +} + +thread_local!(static THREAD_DATA: ThreadData = ThreadData::new()); + +const LOCKED_BIT: usize = 1; +const QUEUE_LOCKED_BIT: usize = 2; +const QUEUE_MASK: usize = !3; + +// Word-sized lock that is used to implement the parking_lot API. Since this +// can't used parking_lot, it instead manages its own queue of waiting threads. +pub struct WordLock { + state: AtomicUsize, +} + +impl WordLock { + #[inline] + pub const fn new() -> WordLock { + WordLock { state: AtomicUsize::new(0) } + } + + #[inline] + pub unsafe fn lock(&self) { + if self.state + .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed) + .is_ok() { + return; + } + self.lock_slow(); + } + + #[inline] + pub unsafe fn unlock(&self) { + if self.state + .compare_exchange_weak(LOCKED_BIT, 0, Ordering::Release, Ordering::Relaxed) + .is_ok() { + return; + } + self.unlock_slow(); + } + + #[inline(never)] + unsafe fn lock_slow(&self) { + let mut spin_count = 0; + let mut state = self.state.load(Ordering::Relaxed); + loop { + // Grab the lock if it isn't locked, even if there is a queue on it + if state & LOCKED_BIT == 0 { + match self.state + .compare_exchange_weak(state, + state | LOCKED_BIT, + Ordering::Acquire, + Ordering::Relaxed) { + Ok(_) => return, + Err(x) => state = x, + } + continue; + } + + // If there is no queue, try spinning a few times + if state & QUEUE_MASK == 0 && spin_count < SPIN_LIMIT { + spin_count += 1; + thread::yield_now(); + state = self.state.load(Ordering::Relaxed); + continue; + } + + // Spin if the queue is locked + if state & QUEUE_LOCKED_BIT != 0 { + thread::yield_now(); + state = self.state.load(Ordering::Relaxed); + continue; + } + + // Try locking the queue + if let Err(x) = self.state + .compare_exchange_weak(state, + state | QUEUE_LOCKED_BIT, + Ordering::Acquire, + Ordering::Relaxed) { + state = x; + continue; + } + + // Get our thread data + THREAD_DATA.with(|thread_data| { + assert!(mem::align_of_val(thread_data) > !QUEUE_MASK); + + // Add our thread to the queue and unlock the queue + thread_data.next_in_queue.set(ptr::null()); + thread_data.parker.prepare_park(); + let mut queue_head = (state & QUEUE_MASK) as *const ThreadData; + if !queue_head.is_null() { + (*(*queue_head).queue_tail.get()).next_in_queue.set(thread_data); + } else { + queue_head = thread_data; + } + (*queue_head).queue_tail.set(thread_data); + self.state.store((queue_head as usize) | LOCKED_BIT, Ordering::Release); + + // Sleep until we are woken up by an unlock + thread_data.parker.park(); + }); + + self.state.load(Ordering::Relaxed); + } + } + + #[inline(never)] + unsafe fn unlock_slow(&self) { + let queue_head; + let mut state = self.state.load(Ordering::Relaxed); + loop { + // Unlock directly if there is no queue + if state == LOCKED_BIT { + match self.state + .compare_exchange_weak(LOCKED_BIT, 0, Ordering::Release, Ordering::Relaxed) { + Ok(_) => return, + Err(x) => state = x, + } + continue; + } + + // Spin if the queue is locked + if state & QUEUE_LOCKED_BIT != 0 { + thread::yield_now(); + state = self.state.load(Ordering::Relaxed); + continue; + } + + // Try locking the queue + match self.state + .compare_exchange_weak(state, + state | QUEUE_LOCKED_BIT, + Ordering::Acquire, + Ordering::Relaxed) { + Ok(_) => { + queue_head = (state & QUEUE_MASK) as *mut ThreadData; + break; + } + Err(x) => state = x, + } + } + + // At this point the queue is locked and must be non-empty. First remove + // the first entry in the queue. + let new_queue_head = (*queue_head).next_in_queue.get(); + if !new_queue_head.is_null() { + (*new_queue_head).queue_tail.set((*queue_head).queue_tail.get()); + } + self.state.store(new_queue_head as usize, Ordering::Release); + + // Then wake up the thread we removed from the queue + (*queue_head).parker.unpark(); + } +}