Initial commit

This commit is contained in:
Amanieu d'Antras
2016-05-11 19:26:54 +01:00
commit 5cc3a5dd40
16 changed files with 3259 additions and 0 deletions
+2
View File
@@ -0,0 +1,2 @@
target
Cargo.lock
+25
View File
@@ -0,0 +1,25 @@
language: rust
sudo: false
rust:
- nightly
before_script:
- |
pip install 'travis-cargo<0.2' --user &&
export PATH=$HOME/.local/bin:$PATH
script:
- travis-cargo build
- travis-cargo test
- cargo doc --no-deps
after_success:
- travis-cargo --only nightly doc-upload
env:
global:
- TRAVIS_CARGO_NIGHTLY_FEATURE=""
notifications:
email: false
+13
View File
@@ -0,0 +1,13 @@
[package]
name = "parking_lot"
version = "0.1.0"
authors = ["Amanieu d'Antras <amanieu@gmail.com>"]
description = "Compact and efficient synchronization primitives. Also provides an API for creating custom synchronization primitives."
documentation = "https://amanieu.github.io/parking_lot/parking_lot/index.html"
license = "Apache-2.0/MIT"
repository = "https://github.com/Amanieu/parking_lot"
readme = "README.md"
keywords = ["mutex", "condvar", "rwlock", "once", "thread"]
[dev-dependencies]
rand = "0.3"
+201
View File
@@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
+25
View File
@@ -0,0 +1,25 @@
Copyright (c) 2016 The Rust Project Developers
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
+130
View File
@@ -0,0 +1,130 @@
parking_lot
============
[![Build Status](https://travis-ci.org/Amanieu/parking_lot.svg?branch=master)](https://travis-ci.org/Amanieu/parking_lot) [![Crates.io](https://img.shields.io/crates/v/parking_lot.svg)](https://crates.io/crates/parking_lot)
[Documentation](https://amanieu.github.io/parking_lot/parking_lot/index.html)
This library provides implementations of `Mutex`, `RwLock`, `Condvar` and
`Once` that are smaller, faster and more flexible than those in the Rust
standard library. It also exposes a low-level API for creating your own
efficient synchronization primitives.
## Features
The primitives provided by this library have several advantages over those
in the Rust standard library:
1. `Mutex`, `Condvar` and `Once` only require 1 byte of storage space, and
`RwLock` only requires 1 word of storage space. On the other hand the
standard library primitives require a dynamically allocated `Box` to hold
OS-specific synchronization primitives. The small size of `Mutex` in
particular encourages the use of fine-grained locks to increase
parallelism.
2. Since they consist of just a single atomic variable, have constant
initializers and don't need destructors, these primitives can be used as
`static` global variables. The standard library primitives require
dynamic initialization and thus need to be lazily initialized with
`lazy_static!`.
3. Uncontended lock acquisition and release is done through fast inline
paths which only require a single atomic operation.
4. Microcontention (a contended lock with a short critical section) is
efficiently handled by spinning a few times while trying to acquire a
lock.
5. The locks are adaptive and will suspend a thread after a few failed spin
attempts. This makes the locks suitable for both long and short critical
sections.
## The parking lot
To keep these primitives small, all thread queuing and suspending
functionality is offloaded to the *parking lot*. The idea behind this is
based on the Webkit [`WTF::ParkingLot`]
(https://webkit.org/blog/6161/locking-in-webkit/) class, which essentially
consists of a hash table mapping of lock addresses to queues of parked
(sleeping) threads. The Webkit parking lot was itself inspired by Linux
[futexes](http://man7.org/linux/man-pages/man2/futex.2.html), but it is more
powerful since it allows invoking callbacks while holding a queue lock.
*Parking* refers to suspending the thread while simultaneously enqueuing it
on a queue keyed by some address. *Unparking* refers to dequeuing a thread
from a queue keyed by some address and resuming it. The parking lot API
consists of just 3 functions:
```rust,ignore
unsafe fn park(key: usize,
validate: &mut FnMut() -> bool,
before_sleep: &mut FnMut(),
timeout: Option<Instant>)
-> bool
```
This function performs the following steps:
1. Lock the queue associated with `key`.
2. Call `validate`, if it returns `false`, unlock the queue and return.
3. Add the current thread to the queue.
4. Unlock the queue.
5. Call `before_sleep`.
6. Sleep until we are unparked or `timeout` is reached.
7. Return `true` if we were unparked by another thread, `false` otherwise.
```rust,ignore
unsafe fn unpark_one(key: usize,
callback: &mut FnMut(UnparkResult))
-> UnparkResult
```
This function will unpark a single thread from the queue associated with
`key`. The `callback` function is invoked while holding the queue lock but
before the thread is unparked. The `UnparkResult` indicates whether the
queue was empty and, if not, whether there are still threads remaining in
the queue.
```rust,ignore
unsafe fn unpark_all(key: usize) -> usize
```
This function will unpark all threads in the queue associated with `key`. It
returns the number of threads that were unparked.
## Building custom synchronization primitives
Building custom synchronization primitives is very simple since
`parking_lot` takes care of all the hard parts for you. The most commmon
case for a custom primitive would be to integrate a `Mutex` inside another
data type. Since a mutex only requires 2 bits, it can share space with other
data. For example, one could create an `ArcMutex` type that combines the
atomic reference count and the two mutex bits in the same atomic word.
## Usage
This crate currently requires a nightly Rust compiler.
Add this to your `Cargo.toml`:
```toml
[dependencies]
parking_lot = "0.1"
```
and this to your crate root:
```rust
extern crate parking_lot;
```
## License
Licensed under either of
* Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
* MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
at your option.
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any
additional terms or conditions.
+334
View File
@@ -0,0 +1,334 @@
// Copyright 2016 Amanieu d'Antras
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
use std::sync::atomic::{AtomicU8, Ordering};
use std::time::{Duration, Instant};
use parking_lot::{self, UnparkResult};
use mutex::{MutexGuard, guard_lock};
/// A type indicating whether a timed wait on a condition variable returned
/// due to a time out or not.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub struct WaitTimeoutResult(bool);
impl WaitTimeoutResult {
/// Returns whether the wait was known to have timed out.
pub fn timed_out(&self) -> bool {
self.0
}
}
/// A Condition Variable
///
/// Condition variables represent the ability to block a thread such that it
/// consumes no CPU time while waiting for an event to occur. Condition
/// variables are typically associated with a boolean predicate (a condition)
/// and a mutex. The predicate is always verified inside of the mutex before
/// determining that thread must block.
///
/// # Differences from the standard library `Condvar`
///
/// - No spurious wakeups: A wait will only return a non-timeout result if it
/// was woken up by `notify_one` or `notify_all`.
/// - Only requires 1 byte of space, whereas the standard library boxes the
/// `Condvar` due to platform limitations.
/// - Can be statically constructed (requires the `const_fn` nightly feature).
/// - Does not require any drop glue when dropped.
/// - Inline fast path for the uncontended case.
///
/// # Examples
///
/// ```
/// use parking_lot::{Mutex, Condvar};
/// use std::sync::Arc;
/// use std::thread;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = pair.clone();
///
/// // Inside of our lock, spawn a new thread, and then wait for it to start
/// thread::spawn(move|| {
/// let &(ref lock, ref cvar) = &*pair2;
/// let mut started = lock.lock();
/// *started = true;
/// cvar.notify_one();
/// });
///
/// // wait for the thread to start up
/// let &(ref lock, ref cvar) = &*pair;
/// let mut started = lock.lock();
/// while !*started {
/// cvar.wait(&mut started);
/// }
/// ```
pub struct Condvar {
state: AtomicU8,
}
impl Condvar {
/// Creates a new condition variable which is ready to be waited on and
/// notified.
#[inline]
pub const fn new() -> Condvar {
Condvar { state: AtomicU8::new(0) }
}
/// Wakes up one blocked thread on this condvar.
///
/// If there is a blocked thread on this condition variable, then it will
/// be woken up from its call to `wait` or `wait_timeout`. Calls to
/// `notify_one` are not buffered in any way.
///
/// To wake up all threads, see `notify_all()`.
#[inline]
pub fn notify_one(&self) {
// Nothing to do if there are no waiting threads
if self.state.load(Ordering::Relaxed) == 0 {
return;
}
unsafe {
// Unpark one thread
let addr = self as *const _ as usize;
let callback = &mut |result| {
// Clear our state if there are no more waiting threads
if result != UnparkResult::UnparkedNotLast {
self.state.store(0, Ordering::Relaxed);
}
};
parking_lot::unpark_one(addr, callback);
}
}
/// Wakes up all blocked threads on this condvar.
///
/// This method will ensure that any current waiters on the condition
/// variable are awoken. Calls to `notify_all()` are not buffered in any
/// way.
///
/// To wake up only one thread, see `notify_one()`.
#[inline]
pub fn notify_all(&self) {
// Nothing to do if there are no waiting threads
if self.state.load(Ordering::Relaxed) == 0 {
return;
}
// Clear our state since we are going to wake all threads up anyways
self.state.store(0, Ordering::Relaxed);
unsafe {
// Unpark all threads
let addr = self as *const _ as usize;
parking_lot::unpark_all(addr);
}
}
/// Blocks the current thread until this condition variable receives a
/// notification.
///
/// This function will atomically unlock the mutex specified (represented by
/// `mutex_guard`) and block the current thread. This means that any calls
/// to `notify_*()` which happen logically after the mutex is unlocked are
/// candidates to wake this thread up. When this function call returns, the
/// lock specified will have been re-acquired.
#[inline]
pub fn wait<T: ?Sized>(&self, guard: &mut MutexGuard<T>) {
unsafe {
let addr = self as *const _ as usize;
let validate = &mut || {
// This is done while locked to avoid races with notify_one
self.state.store(1, Ordering::Relaxed);
true
};
let before_sleep = &mut || {
// Unlock the mutex before sleeping...
guard_lock(guard).unlock();
};
parking_lot::park(addr, validate, before_sleep, None);
// ... and re-lock it once we are done sleeping
guard_lock(guard).lock();
}
}
/// Waits on this condition variable for a notification, timing out after
/// the specified time instant.
///
/// The semantics of this function are equivalent to `wait()` except that
/// the thread will be blocked roughly until `timeout` is reached. This
/// method should not be used for precise timing due to anomalies such as
/// preemption or platform differences that may not cause the maximum
/// amount of time waited to be precisely `timeout`.
///
/// The returned `WaitTimeoutResult` value indicates if the timeout is
/// known to have elapsed.
///
/// Like `wait`, the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
#[inline]
pub fn wait_until<T: ?Sized>(&self,
guard: &mut MutexGuard<T>,
timeout: Instant)
-> WaitTimeoutResult {
unsafe {
let result;
if timeout <= Instant::now() {
// If the timeout is in the past, we still need to release and
// re-acquire the mutex.
guard_lock(guard).unlock();
result = false;
} else {
let addr = self as *const _ as usize;
let validate = &mut || {
// This is done while locked to avoid races with notify_one
self.state.store(1, Ordering::Relaxed);
true
};
let before_sleep = &mut || {
// Unlock the mutex before sleeping...
guard_lock(guard).unlock();
};
result = parking_lot::park(addr, validate, before_sleep, Some(timeout));
}
// ... and re-lock it once we are done sleeping
guard_lock(guard).lock();
WaitTimeoutResult(!result)
}
}
/// Waits on this condition variable for a notification, timing out after a
/// specified duration.
///
/// The semantics of this function are equivalent to `wait()` except that
/// the thread will be blocked for roughly no longer than `timeout`. This
/// method should not be used for precise timing due to anomalies such as
/// preemption or platform differences that may not cause the maximum
/// amount of time waited to be precisely `timeout`.
///
/// The returned `WaitTimeoutResult` value indicates if the timeout is
/// known to have elapsed.
///
/// Like `wait`, the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
#[inline]
pub fn wait_for<T: ?Sized>(&self,
guard: &mut MutexGuard<T>,
timeout: Duration)
-> WaitTimeoutResult {
self.wait_until(guard, Instant::now() + timeout)
}
}
impl Default for Condvar {
fn default() -> Condvar {
Condvar::new()
}
}
#[cfg(test)]
mod tests {
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use {Condvar, Mutex};
#[test]
fn smoke() {
let c = Condvar::new();
c.notify_one();
c.notify_all();
}
#[test]
fn notify_one() {
static C: Condvar = Condvar::new();
static M: Mutex<()> = Mutex::new(());
let mut g = M.lock();
let _t = thread::spawn(move || {
let _g = M.lock();
C.notify_one();
});
C.wait(&mut g);
}
#[test]
fn notify_all() {
const N: usize = 10;
let data = Arc::new((Mutex::new(0), Condvar::new()));
let (tx, rx) = channel();
for _ in 0..N {
let data = data.clone();
let tx = tx.clone();
thread::spawn(move || {
let &(ref lock, ref cond) = &*data;
let mut cnt = lock.lock();
*cnt += 1;
if *cnt == N {
tx.send(()).unwrap();
}
while *cnt != 0 {
cond.wait(&mut cnt);
}
tx.send(()).unwrap();
});
}
drop(tx);
let &(ref lock, ref cond) = &*data;
rx.recv().unwrap();
let mut cnt = lock.lock();
*cnt = 0;
cond.notify_all();
drop(cnt);
for _ in 0..N {
rx.recv().unwrap();
}
}
#[test]
fn wait_for() {
static C: Condvar = Condvar::new();
static M: Mutex<()> = Mutex::new(());
let mut g = M.lock();
let no_timeout = C.wait_for(&mut g, Duration::from_millis(1));
assert!(no_timeout.timed_out());
let _t = thread::spawn(move || {
let _g = M.lock();
C.notify_one();
});
let timeout_res = C.wait_for(&mut g, Duration::from_millis(u32::max_value() as u64));
assert!(!timeout_res.timed_out());
drop(g);
}
#[test]
fn wait_until() {
static C: Condvar = Condvar::new();
static M: Mutex<()> = Mutex::new(());
let mut g = M.lock();
let no_timeout = C.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
assert!(no_timeout.timed_out());
let _t = thread::spawn(move || {
let _g = M.lock();
C.notify_one();
});
let timeout_res = C.wait_until(&mut g,
Instant::now() +
Duration::from_millis(u32::max_value() as u64));
assert!(!timeout_res.timed_out());
drop(g);
}
}
+120
View File
@@ -0,0 +1,120 @@
// Copyright 2016 Amanieu d'Antras
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
//! This library provides implementations of `Mutex`, `RwLock`, `Condvar` and
//! `Once` that are smaller, faster and more flexible than those in the Rust
//! standard library. It also exposes a low-level API for creating your own
//! efficient synchronization primitives.
//!
//! # Features
//!
//! The primitives provided by this library have several advantages over those
//! in the Rust standard library:
//!
//! 1. `Mutex`, `Condvar` and `Once` only require 1 byte of storage space, and
//! `RwLock` only requires 1 word of storage space. On the other hand the
//! standard library primitives require a dynamically allocated `Box` to hold
//! OS-specific synchronization primitives. The small size of `Mutex` in
//! particular encourages the use of fine-grained locks to increase
//! parallelism.
//! 2. Since they consist of just a single atomic variable, have constant
//! initializers and don't need destructors, these primitives can be used as
//! `static` global variables. The standard library primitives require
//! dynamic initialization and thus need to be lazily initialized with
//! `lazy_static!`.
//! 3. Uncontended lock acquisition and release is done through fast inline
//! paths which only require a single atomic operation.
//! 4. Microcontention (a contended lock with a short critical section) is
//! efficiently handled by spinning a few times while trying to acquire a
//! lock.
//! 5. The locks are adaptive and will suspend a thread after a few failed spin
//! attempts. This makes the locks suitable for both long and short critical
//! sections.
//!
//! # The parking lot
//!
//! To keep these primitives small, all thread queuing and suspending
//! functionality is offloaded to the *parking lot*. The idea behind this is
//! based on the Webkit [`WTF::ParkingLot`]
//! (https://webkit.org/blog/6161/locking-in-webkit/) class, which essentially
//! consists of a hash table mapping of lock addresses to queues of parked
//! (sleeping) threads. The Webkit parking lot was itself inspired by Linux
//! [futexes](http://man7.org/linux/man-pages/man2/futex.2.html), but it is more
//! powerful since it allows invoking callbacks while holding a queue lock.
//!
//! *Parking* refers to suspending the thread while simultaneously enqueuing it
//! on a queue keyed by some address. *Unparking* refers to dequeuing a thread
//! from a queue keyed by some address and resuming it. The parking lot API
//! consists of just 3 functions:
//!
//! ```rust,ignore
//! unsafe fn park(key: usize,
//! validate: &mut FnMut() -> bool,
//! before_sleep: &mut FnMut(),
//! timeout: Option<Instant>)
//! -> bool
//! ```
//!
//! This function performs the following steps:
//!
//! 1. Lock the queue associated with `key`.
//! 2. Call `validate`, if it returns `false`, unlock the queue and return.
//! 3. Add the current thread to the queue.
//! 4. Unlock the queue.
//! 5. Call `before_sleep`.
//! 6. Sleep until we are unparked or `timeout` is reached.
//! 7. Return `true` if we were unparked by another thread, `false` otherwise.
//!
//! ```rust,ignore
//! unsafe fn unpark_one(key: usize,
//! callback: &mut FnMut(UnparkResult))
//! -> UnparkResult
//! ```
//!
//! This function will unpark a single thread from the queue associated with
//! `key`. The `callback` function is invoked while holding the queue lock but
//! before the thread is unparked. The `UnparkResult` indicates whether the
//! queue was empty and, if not, whether there are still threads remaining in
//! the queue.
//!
//! ```rust,ignore
//! unsafe fn unpark_all(key: usize) -> usize
//! ```
//!
//! This function will unpark all threads in the queue associated with `key`. It
//! returns the number of threads that were unparked.
//!
//! # Building custom synchronization primitives
//!
//! Building custom synchronization primitives is very simple since
//! `parking_lot` takes care of all the hard parts for you. The most commmon
//! case for a custom primitive would be to integrate a `Mutex` inside another
//! data type. Since a mutex only requires 2 bits, it can share space with other
//! data. For example, one could create an `ArcMutex` type that combines the
//! atomic reference count and the two mutex bits in the same atomic word.
#![warn(missing_docs)]
#![feature(extended_compare_and_swap, const_fn, integer_atomics)]
// Spin limit from JikesRVM & Webkit experiments
const SPIN_LIMIT: usize = 40;
mod thread_parker;
mod word_lock;
mod parking_lot;
mod raw_mutex;
mod raw_rwlock;
mod condvar;
mod mutex;
mod rwlock;
mod once;
pub use once::{Once, OnceState};
pub use parking_lot::{UnparkResult, park, unpark_one, unpark_all};
pub use mutex::{Mutex, MutexGuard};
pub use condvar::{Condvar, WaitTimeoutResult};
pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
+354
View File
@@ -0,0 +1,354 @@
// Copyright 2016 Amanieu d'Antras
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::fmt;
use raw_mutex::RawMutex;
/// A mutual exclusion primitive useful for protecting shared data
///
/// This mutex will block threads waiting for the lock to become available. The
/// mutex can also be statically initialized or created via a `new`
/// constructor. Each mutex has a type parameter which represents the data that
/// it is protecting. The data can only be accessed through the RAII guards
/// returned from `lock` and `try_lock`, which guarantees that the data is only
/// ever accessed when the mutex is locked.
///
/// # Differences from the standard library `Mutex`
///
/// - No poisoning, the lock is released normally on panic.
/// - Only requires 1 byte of space, whereas the standard library boxes the
/// `Mutex` due to platform limitations.
/// - A `MutexGuard` can be sent to another thread and unlocked there.
/// - Can be statically constructed (requires the `const_fn` nightly feature).
/// - Does not require any drop glue when dropped.
/// - Inline fast path for the uncontended case.
/// - Efficient handling of micro-contention using adaptive spinning.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use parking_lot::Mutex;
/// use std::thread;
/// use std::sync::mpsc::channel;
///
/// const N: usize = 10;
///
/// // Spawn a few threads to increment a shared variable (non-atomically), and
/// // let the main thread know once all increments are done.
/// //
/// // Here we're using an Arc to share memory among threads, and the data inside
/// // the Arc is protected with a mutex.
/// let data = Arc::new(Mutex::new(0));
///
/// let (tx, rx) = channel();
/// for _ in 0..10 {
/// let (data, tx) = (data.clone(), tx.clone());
/// thread::spawn(move || {
/// // The shared state can only be accessed once the lock is held.
/// // Our non-atomic increment is safe because we're the only thread
/// // which can access the shared state when the lock is held.
/// let mut data = data.lock();
/// *data += 1;
/// if *data == N {
/// tx.send(()).unwrap();
/// }
/// // the lock is unlocked here when `data` goes out of scope.
/// });
/// }
///
/// rx.recv().unwrap();
/// ```
pub struct Mutex<T: ?Sized> {
mutex: RawMutex,
data: UnsafeCell<T>,
}
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}
/// An RAII implementation of a "scoped lock" of a mutex. When this structure is
/// dropped (falls out of scope), the lock will be unlocked.
///
/// The data protected by the mutex can be access through this guard via its
/// `Deref` and `DerefMut` implementations
#[must_use]
pub struct MutexGuard<'a, T: ?Sized + 'a> {
mutex: &'a RawMutex,
data: &'a mut T,
}
impl<T> Mutex<T> {
/// Creates a new mutex in an unlocked state ready for use.
pub const fn new(val: T) -> Mutex<T> {
Mutex {
data: UnsafeCell::new(val),
mutex: RawMutex::new(),
}
}
/// Consumes this mutex, returning the underlying data.
pub fn into_inner(self) -> T {
unsafe { self.data.into_inner() }
}
}
impl<T: ?Sized> Mutex<T> {
/// Acquires a mutex, blocking the current thread until it is able to do so.
///
/// This function will block the local thread until it is available to acquire
/// the mutex. Upon returning, the thread is the only thread with the mutex
/// held. An RAII guard is returned to allow scoped unlock of the lock. When
/// the guard goes out of scope, the mutex will be unlocked.
///
/// Attempts to lock a mutex in the thread which already holds the lock will
/// result is a deadlock.
pub fn lock(&self) -> MutexGuard<T> {
self.mutex.lock();
MutexGuard {
mutex: &self.mutex,
data: unsafe { &mut *self.data.get() },
}
}
/// Attempts to acquire this lock.
///
/// If the lock could not be acquired at this time, then `Err` is returned.
/// Otherwise, an RAII guard is returned. The lock will be unlocked when the
/// guard is dropped.
///
/// This function does not block.
pub fn try_lock(&self) -> Option<MutexGuard<T>> {
if self.mutex.try_lock() {
Some(MutexGuard {
mutex: &self.mutex,
data: unsafe { &mut *self.data.get() },
})
} else {
None
}
}
/// Returns a mutable reference to the underlying data.
///
/// Since this call borrows the `Mutex` mutably, no actual locking needs to
/// take place---the mutable borrow statically guarantees no locks exist.
pub fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.data.get() }
}
}
impl<T: ?Sized + Default> Default for Mutex<T> {
fn default() -> Mutex<T> {
Mutex::new(Default::default())
}
}
impl<T: ?Sized + fmt::Debug> fmt::Debug for Mutex<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.try_lock() {
Some(guard) => write!(f, "Mutex {{ data: {:?} }}", &*guard),
None => write!(f, "Mutex {{ <locked> }}"),
}
}
}
impl<'a, T: ?Sized + 'a> Deref for MutexGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
self.data
}
}
impl<'a, T: ?Sized + 'a> DerefMut for MutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
self.data
}
}
impl<'a, T: ?Sized + 'a> Drop for MutexGuard<'a, T> {
fn drop(&mut self) {
self.mutex.unlock();
}
}
// Helper function used by Condvar
pub fn guard_lock<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a RawMutex {
&guard.mutex
}
#[cfg(test)]
mod tests {
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use {Mutex, Condvar};
struct Packet<T>(Arc<(Mutex<T>, Condvar)>);
#[derive(Eq, PartialEq, Debug)]
struct NonCopy(i32);
unsafe impl<T: Send> Send for Packet<T> {}
unsafe impl<T> Sync for Packet<T> {}
#[test]
fn smoke() {
let m = Mutex::new(());
drop(m.lock());
drop(m.lock());
}
#[test]
fn lots_and_lots() {
static M: Mutex<()> = Mutex::new(());
static mut CNT: u32 = 0;
const J: u32 = 1000;
const K: u32 = 3;
fn inc() {
for _ in 0..J {
unsafe {
let _g = M.lock();
CNT += 1;
}
}
}
let (tx, rx) = channel();
for _ in 0..K {
let tx2 = tx.clone();
thread::spawn(move || {
inc();
tx2.send(()).unwrap();
});
let tx2 = tx.clone();
thread::spawn(move || {
inc();
tx2.send(()).unwrap();
});
}
drop(tx);
for _ in 0..2 * K {
rx.recv().unwrap();
}
assert_eq!(unsafe { CNT }, J * K * 2);
}
#[test]
fn try_lock() {
let m = Mutex::new(());
*m.try_lock().unwrap() = ();
}
#[test]
fn test_into_inner() {
let m = Mutex::new(NonCopy(10));
assert_eq!(m.into_inner(), NonCopy(10));
}
#[test]
fn test_into_inner_drop() {
struct Foo(Arc<AtomicUsize>);
impl Drop for Foo {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
let num_drops = Arc::new(AtomicUsize::new(0));
let m = Mutex::new(Foo(num_drops.clone()));
assert_eq!(num_drops.load(Ordering::SeqCst), 0);
{
let _inner = m.into_inner();
assert_eq!(num_drops.load(Ordering::SeqCst), 0);
}
assert_eq!(num_drops.load(Ordering::SeqCst), 1);
}
#[test]
fn test_get_mut() {
let mut m = Mutex::new(NonCopy(10));
*m.get_mut() = NonCopy(20);
assert_eq!(m.into_inner(), NonCopy(20));
}
#[test]
fn test_mutex_arc_condvar() {
let packet = Packet(Arc::new((Mutex::new(false), Condvar::new())));
let packet2 = Packet(packet.0.clone());
let (tx, rx) = channel();
let _t = thread::spawn(move || {
// wait until parent gets in
rx.recv().unwrap();
let &(ref lock, ref cvar) = &*packet2.0;
let mut lock = lock.lock();
*lock = true;
cvar.notify_one();
});
let &(ref lock, ref cvar) = &*packet.0;
let mut lock = lock.lock();
tx.send(()).unwrap();
assert!(!*lock);
while !*lock {
cvar.wait(&mut lock);
}
}
#[test]
fn test_mutex_arc_nested() {
// Tests nested mutexes and access
// to underlying data.
let arc = Arc::new(Mutex::new(1));
let arc2 = Arc::new(Mutex::new(arc));
let (tx, rx) = channel();
let _t = thread::spawn(move || {
let lock = arc2.lock();
let lock2 = lock.lock();
assert_eq!(*lock2, 1);
tx.send(()).unwrap();
});
rx.recv().unwrap();
}
#[test]
fn test_mutex_arc_access_in_unwind() {
let arc = Arc::new(Mutex::new(1));
let arc2 = arc.clone();
let _ = thread::spawn(move || -> () {
struct Unwinder {
i: Arc<Mutex<i32>>,
}
impl Drop for Unwinder {
fn drop(&mut self) {
*self.i.lock() += 1;
}
}
let _u = Unwinder { i: arc2 };
panic!();
})
.join();
let lock = arc.lock();
assert_eq!(*lock, 2);
}
#[test]
fn test_mutex_unsized() {
let mutex: &Mutex<[i32]> = &Mutex::new([1, 2, 3]);
{
let b = &mut *mutex.lock();
b[0] = 4;
b[2] = 5;
}
let comp: &[i32] = &[4, 2, 5];
assert_eq!(&*mutex.lock(), comp);
}
}
+383
View File
@@ -0,0 +1,383 @@
// Copyright 2016 Amanieu d'Antras
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
use std::sync::atomic::{AtomicU8, Ordering, fence};
use std::thread;
use std::mem;
use parking_lot;
use SPIN_LIMIT;
const DONE_BIT: u8 = 1;
const POISON_BIT: u8 = 2;
const LOCKED_BIT: u8 = 4;
const PARKED_BIT: u8 = 8;
/// State yielded to the `call_once_force` method which can be used to query
/// whether the `Once` was previously poisoned or not.
pub struct OnceState(bool);
impl OnceState {
/// Returns whether the associated `Once` has been poisoned.
///
/// Once an initalization routine for a `Once` has panicked it will forever
/// indicate to future forced initialization routines that it is poisoned.
pub fn poisoned(&self) -> bool {
self.0
}
}
/// A synchronization primitive which can be used to run a one-time
/// initialization. Useful for one-time initialization for globals, FFI or
/// related functionality.
///
/// # Differences from the standard library `Once`
///
/// - Only requires 1 byte of space, instead of 1 word.
/// - Not required to be `'static`.
/// - Relaxed memory barriers in the fast path, which can significantly improve
/// performance on some architectures.
/// - Efficient handling of micro-contention using adaptive spinning.
///
/// # Examples
///
/// ```
/// # #![feature(const_fn)]
/// use parking_lot::Once;
///
/// static START: Once = Once::new();
///
/// START.call_once(|| {
/// // run initialization here
/// });
/// ```
pub struct Once(AtomicU8);
impl Once {
/// Creates a new `Once` value.
#[inline]
pub const fn new() -> Once {
Once(AtomicU8::new(0))
}
/// Performs an initialization routine once and only once. The given closure
/// will be executed if this is the first time `call_once` has been called,
/// and otherwise the routine will *not* be invoked.
///
/// This method will block the calling thread if another initialization
/// routine is currently running.
///
/// When this function returns, it is guaranteed that some initialization
/// has run and completed (it may not be the closure specified). It is also
/// guaranteed that any memory writes performed by the executed closure can
/// be reliably observed by other threads at this point (there is a
/// happens-before relation between the closure and code executing after the
/// return).
///
/// # Examples
///
/// ```
/// # #![feature(const_fn)]
/// use parking_lot::Once;
///
/// static mut VAL: usize = 0;
/// static INIT: Once = Once::new();
///
/// // Accessing a `static mut` is unsafe much of the time, but if we do so
/// // in a synchronized fashion (e.g. write once or read all) then we're
/// // good to go!
/// //
/// // This function will only call `expensive_computation` once, and will
/// // otherwise always return the value returned from the first invocation.
/// fn get_cached_val() -> usize {
/// unsafe {
/// INIT.call_once(|| {
/// VAL = expensive_computation();
/// });
/// VAL
/// }
/// }
///
/// fn expensive_computation() -> usize {
/// // ...
/// # 2
/// }
/// ```
///
/// # Panics
///
/// The closure `f` will only be executed once if this is called
/// concurrently amongst many threads. If that closure panics, however, then
/// it will *poison* this `Once` instance, causing all future invocations of
/// `call_once` to also panic.
#[inline]
pub fn call_once<F>(&self, f: F)
where F: FnOnce()
{
if self.0.load(Ordering::Acquire) == DONE_BIT {
return;
}
let mut f = Some(f);
self.call_once_slow(false, &mut |_| f.take().unwrap()());
}
/// Performs the same function as `call_once` except ignores poisoning.
///
/// If this `Once` has been poisoned (some initialization panicked) then
/// this function will continue to attempt to call initialization functions
/// until one of them doesn't panic.
///
/// The closure `f` is yielded a structure which can be used to query the
/// state of this `Once` (whether initialization has previously panicked or
/// not).
#[inline]
pub fn call_once_force<F>(&self, f: F)
where F: FnOnce(OnceState)
{
if self.0.load(Ordering::Acquire) == DONE_BIT {
return;
}
let mut f = Some(f);
self.call_once_slow(true, &mut |state| f.take().unwrap()(state));
}
// This is a non-generic function to reduce the monomorphization cost of
// using `call_once` (this isn't exactly a trivial or small implementation).
//
// Additionally, this is tagged with `#[cold]` as it should indeed be cold
// and it helps let LLVM know that calls to this function should be off the
// fast path. Essentially, this should help generate more straight line code
// in LLVM.
//
// Finally, this takes an `FnMut` instead of a `FnOnce` because there's
// currently no way to take an `FnOnce` and call it via virtual dispatch
// without some allocation overhead.
#[cold]
#[inline(never)]
fn call_once_slow(&self, ignore_poison: bool, f: &mut FnMut(OnceState)) {
let mut spin_count = 0;
let mut state = self.0.load(Ordering::Relaxed);
loop {
// If another thread called the closure, we're done
if state & DONE_BIT != 0 {
// An acquire fence is needed here since we didn't load the
// state with Ordering::Acquire.
fence(Ordering::Acquire);
return;
}
// If the state has been poisoned and we aren't forcing, then panic
if state & POISON_BIT != 0 && !ignore_poison {
// Need the fence here as well for the same reason
fence(Ordering::Acquire);
panic!("Once instance has previously been poisoned");
}
// Grab the lock if it isn't locked, even if there is a queue on it.
// We also clear the poison bit since we are going to try running
// the closure again.
if state & LOCKED_BIT == 0 {
match self.0
.compare_exchange_weak(state,
(state | LOCKED_BIT) & !POISON_BIT,
Ordering::Acquire,
Ordering::Relaxed) {
Ok(_) => break,
Err(x) => state = x,
}
continue;
}
// If there is no queue, try spinning a few times
if state & PARKED_BIT == 0 && spin_count < SPIN_LIMIT {
spin_count += 1;
thread::yield_now();
state = self.0.load(Ordering::Relaxed);
continue;
}
// Set the parked bit
if state & PARKED_BIT == 0 {
if let Err(x) = self.0.compare_exchange_weak(state,
state | PARKED_BIT,
Ordering::Relaxed,
Ordering::Relaxed) {
state = x;
continue;
}
}
// Park our thread until we are woken up by the thread that owns the
// lock.
unsafe {
let addr = self as *const _ as usize;
let validate = &mut || self.0.load(Ordering::Relaxed) == LOCKED_BIT | PARKED_BIT;
parking_lot::park(addr, validate, &mut || {}, None);
}
// Loop back and check if the done bit was set
}
struct PanicGuard<'a>(&'a Once);
impl<'a> Drop for PanicGuard<'a> {
fn drop(&mut self) {
// Mark the state as poisoned, unlock it and unpark all threads.
let once = self.0;
let state = once.0.swap(POISON_BIT, Ordering::Release);
if state & PARKED_BIT != 0 {
unsafe {
let addr = once as *const _ as usize;
parking_lot::unpark_all(addr);
}
}
}
}
// At this point we have the lock, so run the closure. Make sure we
// properly clean up if the closure panicks.
let guard = PanicGuard(self);
f(OnceState(state & POISON_BIT != 0));
mem::forget(guard);
// Now unlock the state, set the done bit and unpark all threads
let state = self.0.swap(DONE_BIT, Ordering::Release);
if state & PARKED_BIT != 0 {
unsafe {
let addr = self as *const _ as usize;
parking_lot::unpark_all(addr);
}
}
}
}
impl Default for Once {
fn default() -> Once {
Once::new()
}
}
#[cfg(test)]
mod tests {
use std::panic;
use std::sync::mpsc::channel;
use std::thread;
use Once;
#[test]
fn smoke_once() {
static O: Once = Once::new();
let mut a = 0;
O.call_once(|| a += 1);
assert_eq!(a, 1);
O.call_once(|| a += 1);
assert_eq!(a, 1);
}
#[test]
fn stampede_once() {
static O: Once = Once::new();
static mut run: bool = false;
let (tx, rx) = channel();
for _ in 0..10 {
let tx = tx.clone();
thread::spawn(move || {
for _ in 0..4 {
thread::yield_now()
}
unsafe {
O.call_once(|| {
assert!(!run);
run = true;
});
assert!(run);
}
tx.send(()).unwrap();
});
}
unsafe {
O.call_once(|| {
assert!(!run);
run = true;
});
assert!(run);
}
for _ in 0..10 {
rx.recv().unwrap();
}
}
#[test]
fn poison_bad() {
static O: Once = Once::new();
// poison the once
let t = panic::catch_unwind(|| {
O.call_once(|| panic!());
});
assert!(t.is_err());
// poisoning propagates
let t = panic::catch_unwind(|| {
O.call_once(|| {});
});
assert!(t.is_err());
// we can subvert poisoning, however
let mut called = false;
O.call_once_force(|p| {
called = true;
assert!(p.poisoned())
});
assert!(called);
// once any success happens, we stop propagating the poison
O.call_once(|| {});
}
#[test]
fn wait_for_force_to_finish() {
static O: Once = Once::new();
// poison the once
let t = panic::catch_unwind(|| {
O.call_once(|| panic!());
});
assert!(t.is_err());
// make sure someone's waiting inside the once via a force
let (tx1, rx1) = channel();
let (tx2, rx2) = channel();
let t1 = thread::spawn(move || {
O.call_once_force(|p| {
assert!(p.poisoned());
tx1.send(()).unwrap();
rx2.recv().unwrap();
});
});
rx1.recv().unwrap();
// put another waiter on the once
let t2 = thread::spawn(|| {
let mut called = false;
O.call_once(|| {
called = true;
});
assert!(!called);
});
tx2.send(()).unwrap();
assert!(t1.join().is_ok());
assert!(t2.join().is_ok());
}
}
+469
View File
@@ -0,0 +1,469 @@
// Copyright 2016 Amanieu d'Antras
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
use std::sync::atomic::{AtomicUsize, AtomicPtr, Ordering};
use std::time::Instant;
use std::cell::Cell;
use std::ptr;
use std::mem;
use thread_parker::ThreadParker;
use word_lock::WordLock;
static NUM_THREADS: AtomicUsize = AtomicUsize::new(0);
static HASHTABLE: AtomicPtr<HashTable> = AtomicPtr::new(ptr::null_mut());
thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
// Even with 3x more buckets than threads, the memory overhead per thread is
// still only a few hundred bytes per thread.
const LOAD_FACTOR: usize = 3;
struct HashTable {
// Hash buckets for the table
entries: Box<[Bucket]>,
// Number of bits used for the hash function
hash_bits: u32,
// Previous table. This is only kept to keep leak detectors happy.
_prev: *const HashTable,
}
impl HashTable {
fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
let bucket = Bucket {
mutex: WordLock::new(),
queue_head: Cell::new(ptr::null()),
queue_tail: Cell::new(ptr::null()),
_padding: unsafe { mem::uninitialized() },
};
Box::new(HashTable {
entries: vec![bucket; new_size].into_boxed_slice(),
hash_bits: hash_bits,
_prev: prev,
})
}
}
struct Bucket {
// Lock protecting the queue
mutex: WordLock,
// Linked list of threads waiting on this bucket
queue_head: Cell<*const ThreadData>,
queue_tail: Cell<*const ThreadData>,
// Padding to avoid false sharing between buckets. Ideally we would just
// align the bucket structure to 64 bytes, but Rust doesn't support that yet.
_padding: [u8; 64],
}
// Implementation of Clone for Bucket, needed to make vec![] work
impl Clone for Bucket {
fn clone(&self) -> Bucket {
Bucket {
mutex: WordLock::new(),
queue_head: Cell::new(ptr::null()),
queue_tail: Cell::new(ptr::null()),
_padding: unsafe { mem::uninitialized() },
}
}
}
struct ThreadData {
parker: ThreadParker,
key: Cell<usize>,
next_in_queue: Cell<*const ThreadData>,
}
impl ThreadData {
fn new() -> ThreadData {
// Keep track of the total number of live ThreadData objects and resize
// the hash table accordingly.
let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
unsafe {
grow_hashtable(num_threads);
}
ThreadData {
parker: ThreadParker::new(),
key: Cell::new(0),
next_in_queue: Cell::new(ptr::null()),
}
}
}
impl Drop for ThreadData {
fn drop(&mut self) {
NUM_THREADS.fetch_sub(1, Ordering::Relaxed);
}
}
// Grow the hash table so that it is big enough for the given number of threads.
// This isn't performance-critical since it is only done when a ThreadData is
// created, which only happens once per thread.
unsafe fn grow_hashtable(num_threads: usize) {
// If there is no table, create one
if HASHTABLE.load(Ordering::Relaxed).is_null() {
let new_table = Box::into_raw(HashTable::new(num_threads, ptr::null()));
// If this fails then it means some other thread created the hash
// table first.
if HASHTABLE.compare_exchange(ptr::null_mut(),
new_table,
Ordering::Release,
Ordering::Relaxed)
.is_ok() {
return;
}
// Free the table we created
Box::from_raw(new_table);
}
let mut old_table;
loop {
old_table = HASHTABLE.load(Ordering::Acquire);
// Check if we need to resize the existing table
if (*old_table).entries.len() >= LOAD_FACTOR * num_threads {
return;
}
// Lock all buckets in the old table
for b in &(*old_table).entries[..] {
b.mutex.lock();
}
// Now check if our table is still the latest one. Another thread could
// have grown the hash table between us reading HASHTABLE and locking
// the buckets.
if HASHTABLE.load(Ordering::Relaxed) == old_table {
break;
}
// Unlock buckets and try again
for b in &(*old_table).entries[..] {
b.mutex.unlock();
}
}
// Create the new table
let new_table = HashTable::new(num_threads, old_table);
// Move the entries from the old table to the new one
for b in &(*old_table).entries[..] {
let mut current = b.queue_head.get();
while !current.is_null() {
let next = (*current).next_in_queue.get();
let hash = hash((*current).key.get(), new_table.hash_bits);
if new_table.entries[hash].queue_tail.get().is_null() {
new_table.entries[hash].queue_head.set(current);
} else {
(*new_table.entries[hash].queue_tail.get()).next_in_queue.set(current);
}
new_table.entries[hash].queue_tail.set(current);
(*current).next_in_queue.set(ptr::null());
current = next;
}
}
// Publish the new table. No races are possible at this point because
// any other thread trying to grow the hash table is blocked on the bucket
// locks in the old table.
HASHTABLE.store(Box::into_raw(new_table), Ordering::Release);
// Unlock all buckets in the old table
for b in &(*old_table).entries[..] {
b.mutex.unlock();
}
}
// Hash function for addresses
#[cfg(target_pointer_width = "32")]
fn hash(key: usize, bits: u32) -> usize {
key.wrapping_mul(0x9E3779B9) >> (32 - bits)
}
#[cfg(target_pointer_width = "64")]
fn hash(key: usize, bits: u32) -> usize {
key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
}
// Lock the bucket for the given key
unsafe fn lock_bucket<'a>(key: usize) -> Option<&'a Bucket> {
let mut bucket;
loop {
let hashtable = HASHTABLE.load(Ordering::Acquire);
// If there is no hash table then there is no bucket to lock
if hashtable.is_null() {
return None;
}
let hash = hash(key, (*hashtable).hash_bits);
bucket = &(*hashtable).entries[hash];
// Lock the bucket
bucket.mutex.lock();
// If no other thread has rehashed the table before we grabbed the lock
// then we are good to go! The lock we grabbed prevents any rehashes.
if HASHTABLE.load(Ordering::Relaxed) == hashtable {
return Some(bucket);
}
// Unlock the bucket and try again
bucket.mutex.unlock();
}
}
/// Result of an `unpark_one` operation.
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum UnparkResult {
/// No parked threads were found for the given key.
NoParkedThreads,
/// One thread was unparked and it was one in the queue.
UnparkedLast,
/// One thread was unparked but there are more in the queue.
UnparkedNotLast,
}
/// Parks the current thread in the queue associated with the given key.
///
/// The `validate` function is called while the queue is locked and can abort
/// the operation by returning false. If `validate` returns true then the
/// current thread is appended to the queue and the queue is unlocked.
///
/// The `before_sleep` function is called after the queue is unlocked but before
/// the thread is put to sleep. The thread will then sleep until it is unparked
/// or the given timeout is reached.
///
/// This function returns `true` if the thread was unparked by a call to
/// `unpark_one` or `unpark_all`, and `false` if the validation function failed
/// or the timeout was reached.
///
/// # Safety
///
/// You should only call this function with an address that you control, since
/// you could otherwise interfere with the operation of other synchronization
/// primitives.
///
/// The `validate` function is called while the queue is locked and must not
/// panic or call into any function in `parking_lot`.
///
/// The `before_sleep` function is called outside the queue lock and is allowed
/// to call `unpark_one` or `unpark_all`, but it is not allowed to call `park`
/// or panic.
pub unsafe fn park(key: usize,
validate: &mut FnMut() -> bool,
before_sleep: &mut FnMut(),
timeout: Option<Instant>)
-> bool {
// Grab our thread data, this also ensures that the hash table exists
THREAD_DATA.with(|thread_data| {
// Lock the bucket for the given key
let bucket = lock_bucket(key).unwrap();
// If the validation function fails, just return
if !validate() {
bucket.mutex.unlock();
return false;
}
// Append our thread data to the queue and unlock the bucket
thread_data.next_in_queue.set(ptr::null());
thread_data.key.set(key);
thread_data.parker.prepare_park();
if !bucket.queue_head.get().is_null() {
(*bucket.queue_tail.get()).next_in_queue.set(thread_data);
} else {
bucket.queue_head.set(thread_data);
}
bucket.queue_tail.set(thread_data);
bucket.mutex.unlock();
// Invoke the pre-sleep callback
before_sleep();
// Park our thread and determine whether we were woken up by an unpark
// or by our timeout. Note that this isn't precise: we can still be
// unparked since we are still in the queue.
let unparked = match timeout {
Some(timeout) => thread_data.parker.park_until(timeout),
None => {
thread_data.parker.park();
true
}
};
// If we were unparked, return now
if unparked {
return true;
}
// Lock our bucket again. Note that the hashtable may have been rehashed
// in the meantime.
let bucket = lock_bucket(key).unwrap();
// Now we need to check again if we were unparked or timed out. Unlike
// the last check this is precise because we hold the bucket lock.
if !thread_data.parker.timed_out() {
bucket.mutex.unlock();
return true;
}
// We timed out, so we now need to remove our thread from the queue
let mut link = &bucket.queue_head;
let mut current = bucket.queue_head.get();
let mut previous = ptr::null();
while !current.is_null() {
if current == thread_data {
let next = (*current).next_in_queue.get();
link.set(next);
if bucket.queue_tail.get() == current {
bucket.queue_tail.set(previous);
}
break;
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
}
}
// Unlock the bucket, we are done
bucket.mutex.unlock();
false
})
}
/// Unparks one thread from the queue associated with the given key.
///
/// The `callback` function is called while the queue is locked and before the
/// target thread is woken up. The `UnparkResult` argument to the function
/// indicates whether a thread was found in the queue and whether this was the
/// last thread in the queue. This value is also returned by `unpark_one`.
///
/// # Safety
///
/// You should only call this function with an address that you control, since
/// you could otherwise interfere with the operation of other synchronization
/// primitives.
///
/// The `callback` function is called while the queue is locked and must not
/// panic or call into any function in `parking_lot`.
pub unsafe fn unpark_one(key: usize, callback: &mut FnMut(UnparkResult)) -> UnparkResult {
// Lock the bucket for the given key
let bucket = match lock_bucket(key) {
Some(x) => x,
None => {
// If there is no hash table then there is nothing to unpark
callback(UnparkResult::NoParkedThreads);
return UnparkResult::NoParkedThreads;
}
};
// Find a thread with a matching key and remove it from the queue
let mut link = &bucket.queue_head;
let mut current = bucket.queue_head.get();
let mut previous = ptr::null();
while !current.is_null() {
if (*current).key.get() == key {
// Remove the thread from the queue
let next = (*current).next_in_queue.get();
link.set(next);
let mut result = UnparkResult::UnparkedLast;
if bucket.queue_tail.get() == current {
bucket.queue_tail.set(previous);
} else {
// Scan the rest of the queue to see if there are any other
// entries with the given key.
let mut scan = next;
while !scan.is_null() {
if (*scan).key.get() == key {
result = UnparkResult::UnparkedNotLast;
break;
}
scan = (*scan).next_in_queue.get();
}
}
// Invoke the callback before waking up the thread
callback(result);
// Unpark the thread while holding the bucket lock to avoid race
// conditions with timeouts. Once unparked, the thread will act as
// if it was woken up by an unpark even if it reached its timeout.
(*current).parker.unpark();
bucket.mutex.unlock();
return result;
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
}
}
// No threads with a matching key were found in the bucket
callback(UnparkResult::NoParkedThreads);
bucket.mutex.unlock();
UnparkResult::NoParkedThreads
}
/// Unparks all threads in the queue associated with the given key.
///
/// This function returns the number of threads that were unparked.
///
/// # Safety
///
/// You should only call this function with an address that you control, since
/// you could otherwise interfere with the operation of other synchronization
/// primitives.
pub unsafe fn unpark_all(key: usize) -> usize {
// Lock the bucket for the given key
let bucket = match lock_bucket(key) {
Some(x) => x,
// If there is no hash table then there is nothing to unpark
None => return 0,
};
// Remove all threads with the given key in the bucket
let mut link = &bucket.queue_head;
let mut current = bucket.queue_head.get();
let mut previous = ptr::null();
let mut num_threads = 0;
while !current.is_null() {
if (*current).key.get() == key {
// Remove the thread from the queue
let next = (*current).next_in_queue.get();
link.set(next);
if bucket.queue_tail.get() == current {
bucket.queue_tail.set(previous);
}
// Unpark the thread while holding the bucket lock to avoid race
// conditions with timeouts. Once unparked, the thread will act as
// if it was woken up by an unpark even if it reached its timeout.
(*current).parker.unpark();
num_threads += 1;
current = next;
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
}
}
// Unlock the bucket
bucket.mutex.unlock();
num_threads
}
+146
View File
@@ -0,0 +1,146 @@
// Copyright 2016 Amanieu d'Antras
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
use std::sync::atomic::{AtomicU8, Ordering};
use std::thread;
use parking_lot::{self, UnparkResult};
use SPIN_LIMIT;
const LOCKED_BIT: u8 = 1;
const PARKED_BIT: u8 = 2;
pub struct RawMutex {
state: AtomicU8,
}
impl RawMutex {
#[inline]
pub const fn new() -> RawMutex {
RawMutex { state: AtomicU8::new(0) }
}
#[inline]
pub fn lock(&self) {
if self.state
.compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
.is_ok() {
return;
}
self.lock_slow();
}
#[inline]
pub fn try_lock(&self) -> bool {
let mut state = self.state.load(Ordering::Relaxed);
loop {
if state & LOCKED_BIT != 0 {
return false;
}
match self.state.compare_exchange_weak(state,
state | LOCKED_BIT,
Ordering::Acquire,
Ordering::Relaxed) {
Ok(_) => return true,
Err(x) => state = x,
}
}
}
#[inline]
pub fn unlock(&self) {
if self.state
.compare_exchange_weak(LOCKED_BIT, 0, Ordering::Release, Ordering::Relaxed)
.is_ok() {
return;
}
self.unlock_slow();
}
#[cold]
#[inline(never)]
fn lock_slow(&self) {
let mut spin_count = 0;
let mut state = self.state.load(Ordering::Relaxed);
loop {
// Grab the lock if it isn't locked, even if there is a queue on it
if state & LOCKED_BIT == 0 {
match self.state
.compare_exchange_weak(state,
state | LOCKED_BIT,
Ordering::Acquire,
Ordering::Relaxed) {
Ok(_) => return,
Err(x) => state = x,
}
continue;
}
// If there is no queue, try spinning a few times
if state & PARKED_BIT == 0 && spin_count < SPIN_LIMIT {
spin_count += 1;
thread::yield_now();
state = self.state.load(Ordering::Relaxed);
continue;
}
// Set the parked bit
if state & PARKED_BIT == 0 {
if let Err(x) = self.state.compare_exchange_weak(state,
state | PARKED_BIT,
Ordering::Relaxed,
Ordering::Relaxed) {
state = x;
continue;
}
}
// Park our thread until we are woken up by an unlock
unsafe {
let addr = self as *const _ as usize;
let validate = &mut || {
self.state.load(Ordering::Relaxed) == LOCKED_BIT | PARKED_BIT
};
parking_lot::park(addr, validate, &mut || {}, None);
}
// Loop back and try locking again
state = self.state.load(Ordering::Relaxed);
}
}
#[cold]
#[inline(never)]
fn unlock_slow(&self) {
let mut state = self.state.load(Ordering::Relaxed);
loop {
// Unlock directly if there are no parked threads
if state & PARKED_BIT == 0 {
match self.state
.compare_exchange_weak(LOCKED_BIT, 0, Ordering::Release, Ordering::Relaxed) {
Ok(_) => return,
Err(x) => state = x,
}
continue;
}
// Unpark one thread and leave the parked bit set if there might
// still be parked threads on this address.
unsafe {
let addr = self as *const _ as usize;
let callback = &mut |result| {
if result == UnparkResult::UnparkedNotLast {
self.state.store(PARKED_BIT, Ordering::Release);
} else {
self.state.store(0, Ordering::Release);
}
};
parking_lot::unpark_one(addr, callback);
}
break;
}
}
}
+333
View File
@@ -0,0 +1,333 @@
// Copyright 2016 Amanieu d'Antras
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use parking_lot::{self, UnparkResult};
use SPIN_LIMIT;
const SHARED_PARKED_BIT: usize = 1;
const EXCLUSIVE_PARKED_BIT: usize = 2;
const EXCLUSIVE_LOCKED_BIT: usize = 4;
const SHARED_COUNT_MASK: usize = !7;
const SHARED_COUNT_INC: usize = 8;
pub struct RawRwLock {
state: AtomicUsize,
}
impl RawRwLock {
#[inline]
pub const fn new() -> RawRwLock {
RawRwLock { state: AtomicUsize::new(0) }
}
#[inline]
pub fn lock_exclusive(&self) {
if self.state
.compare_exchange_weak(0,
EXCLUSIVE_LOCKED_BIT,
Ordering::Acquire,
Ordering::Relaxed)
.is_ok() {
return;
}
self.lock_exclusive_slow();
}
#[inline]
pub fn try_lock_exclusive(&self) -> bool {
let mut state = self.state.load(Ordering::Relaxed);
loop {
if state & (EXCLUSIVE_LOCKED_BIT | SHARED_COUNT_MASK) != 0 {
return false;
}
match self.state.compare_exchange_weak(state,
state | EXCLUSIVE_LOCKED_BIT,
Ordering::Acquire,
Ordering::Relaxed) {
Ok(_) => return true,
Err(x) => state = x,
}
}
}
#[inline]
pub fn unlock_exclusive(&self) {
if self.state
.compare_exchange_weak(EXCLUSIVE_LOCKED_BIT,
0,
Ordering::Release,
Ordering::Relaxed)
.is_ok() {
return;
}
self.unlock_exclusive_slow();
}
#[inline]
pub fn lock_shared(&self) {
let state = self.state.load(Ordering::Relaxed);
if state & (EXCLUSIVE_LOCKED_BIT | EXCLUSIVE_PARKED_BIT) == 0 &&
self.state
.compare_exchange_weak(state,
state.checked_add(SHARED_COUNT_INC)
.expect("RwLock shared count overflow"),
Ordering::Acquire,
Ordering::Relaxed)
.is_ok() {
return;
}
self.lock_shared_slow();
}
#[inline]
pub fn try_lock_shared(&self) -> bool {
let mut state = self.state.load(Ordering::Relaxed);
loop {
if state & (EXCLUSIVE_LOCKED_BIT | EXCLUSIVE_PARKED_BIT) != 0 {
return false;
}
match self.state.compare_exchange_weak(state,
state.checked_add(SHARED_COUNT_INC)
.expect("RwLock shared count overflow"),
Ordering::Acquire,
Ordering::Relaxed) {
Ok(_) => return true,
Err(x) => state = x,
}
}
}
#[inline]
pub fn unlock_shared(&self) {
let state = self.state.load(Ordering::Relaxed);
if (state & EXCLUSIVE_PARKED_BIT == 0 || state & SHARED_COUNT_MASK != SHARED_COUNT_INC) &&
self.state
.compare_exchange_weak(state,
state - SHARED_COUNT_INC,
Ordering::Release,
Ordering::Relaxed)
.is_ok() {
return;
}
self.unlock_shared_slow();
}
#[cold]
#[inline(never)]
pub fn lock_exclusive_slow(&self) {
let mut spin_count = 0;
let mut state = self.state.load(Ordering::Relaxed);
loop {
// Grab the lock if it isn't locked, even if there are other
// threads parked.
if state & (EXCLUSIVE_LOCKED_BIT | SHARED_COUNT_MASK) == 0 {
match self.state
.compare_exchange_weak(state,
state | EXCLUSIVE_LOCKED_BIT,
Ordering::Acquire,
Ordering::Relaxed) {
Ok(_) => return,
Err(x) => state = x,
}
continue;
}
// If there are no parked exclusive threads, try spinning a few times
if state & EXCLUSIVE_PARKED_BIT == 0 && spin_count < SPIN_LIMIT {
spin_count += 1;
thread::yield_now();
state = self.state.load(Ordering::Relaxed);
continue;
}
// Set the parked bit
if state & EXCLUSIVE_PARKED_BIT == 0 {
if let Err(x) = self.state.compare_exchange_weak(state,
state | EXCLUSIVE_PARKED_BIT,
Ordering::Relaxed,
Ordering::Relaxed) {
state = x;
continue;
}
}
// Park our thread until we are woken up by an unlock
unsafe {
let addr = self as *const _ as usize;
let validate = &mut || {
self.state.load(Ordering::Relaxed) &
(EXCLUSIVE_LOCKED_BIT | EXCLUSIVE_PARKED_BIT) ==
EXCLUSIVE_LOCKED_BIT | EXCLUSIVE_PARKED_BIT
};
if parking_lot::park(addr, validate, &mut || {}, None) {
// If we successfully parked then the lock will be handed
// off to us.
return;
}
}
// Loop back and try locking again
state = self.state.load(Ordering::Relaxed);
}
}
#[cold]
#[inline(never)]
pub fn unlock_exclusive_slow(&self) {
let mut state = self.state.load(Ordering::Relaxed);
loop {
// Unlock directly if there are no parked threads
if state & (EXCLUSIVE_PARKED_BIT | SHARED_PARKED_BIT) == 0 {
match self.state
.compare_exchange_weak(EXCLUSIVE_LOCKED_BIT,
0,
Ordering::Release,
Ordering::Relaxed) {
Ok(_) => return,
Err(x) => state = x,
}
continue;
}
// If there are exclusive parked threads, hand off the lock to one
// of them without unlocking it. This is needed to avoid writer
// starvation.
if state & EXCLUSIVE_PARKED_BIT != 0 {
unsafe {
let addr = self as *const _ as usize;
let callback = &mut |result| {
// Clear the exclusive parked bit if this was the last thread
if result != UnparkResult::UnparkedNotLast {
self.state.fetch_and(!EXCLUSIVE_PARKED_BIT, Ordering::Relaxed);
}
};
if parking_lot::unpark_one(addr, callback) != UnparkResult::NoParkedThreads {
// We successfully self.state an exclusive thread and the lock
// has been handed off to it.
return;
}
}
}
// Release the exclusive lock and clear the shared parked bit
if let Err(x) = self.state
.compare_exchange_weak(EXCLUSIVE_LOCKED_BIT | SHARED_PARKED_BIT,
0,
Ordering::Release,
Ordering::Relaxed) {
state = x;
continue;
}
// Unpark all waiting shared threads.
unsafe {
let addr = self as *const _ as usize;
parking_lot::unpark_all(addr + 1);
}
break;
}
}
#[cold]
#[inline(never)]
pub fn lock_shared_slow(&self) {
let mut spin_count = 0;
let mut state = self.state.load(Ordering::Relaxed);
loop {
// Grab the lock if there are no exclusive threads locked or waiting
if state & (EXCLUSIVE_LOCKED_BIT | EXCLUSIVE_PARKED_BIT) == 0 {
match self.state
.compare_exchange_weak(state,
state.checked_add(SHARED_COUNT_INC)
.expect("RwLock shared count overflow"),
Ordering::Acquire,
Ordering::Relaxed) {
Ok(_) => return,
Err(x) => state = x,
}
continue;
}
// If there are no parked exclusive threads, try spinning a few times
if state & EXCLUSIVE_PARKED_BIT == 0 && spin_count < SPIN_LIMIT {
spin_count += 1;
thread::yield_now();
state = self.state.load(Ordering::Relaxed);
continue;
}
// Set the shared parked bit
if state & SHARED_PARKED_BIT == 0 {
if let Err(x) = self.state.compare_exchange_weak(state,
state | SHARED_PARKED_BIT,
Ordering::Relaxed,
Ordering::Relaxed) {
state = x;
continue;
}
}
// Park our thread until we are woken up by an unlock
unsafe {
let addr = self as *const _ as usize;
let validate = &mut || {
self.state.load(Ordering::Relaxed) &
(EXCLUSIVE_LOCKED_BIT | SHARED_PARKED_BIT) ==
EXCLUSIVE_LOCKED_BIT | SHARED_PARKED_BIT
};
parking_lot::park(addr + 1, validate, &mut || {}, None);
}
// Loop back and try locking again
state = self.state.load(Ordering::Relaxed);
}
}
#[cold]
#[inline(never)]
pub fn unlock_shared_slow(&self) {
let mut state = self.state.load(Ordering::Relaxed);
loop {
// Unlock directly if there are no parked threads or if there are
// still remaining shared threads
if state & EXCLUSIVE_PARKED_BIT == 0 || state & SHARED_COUNT_MASK != SHARED_COUNT_INC {
match self.state
.compare_exchange_weak(state,
state - SHARED_COUNT_INC,
Ordering::Release,
Ordering::Relaxed) {
Ok(_) => return,
Err(x) => state = x,
}
continue;
}
// At this point we are the last shared thread and we need to wake
// up one exclusive thread to pass the lock to.
unsafe {
let addr = self as *const _ as usize;
let callback = &mut |result| {
// Clear the exclusive parked bit if this was the last thread
if result != UnparkResult::UnparkedNotLast {
self.state.fetch_and(!EXCLUSIVE_PARKED_BIT, Ordering::Relaxed);
}
};
if parking_lot::unpark_one(addr, callback) != UnparkResult::NoParkedThreads {
// We successfully self.state an exclusive thread and the lock
// has been handed off to it.
return;
}
}
// There was no exclusive thread to wake up, so just loop back again
// and try to release the lock normally.
state = self.state.load(Ordering::Relaxed);
}
}
}
+463
View File
@@ -0,0 +1,463 @@
// Copyright 2016 Amanieu d'Antras
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::fmt;
use raw_rwlock::RawRwLock;
/// A reader-writer lock
///
/// This type of lock allows a number of readers or at most one writer at any
/// point in time. The write portion of this lock typically allows modification
/// of the underlying data (exclusive access) and the read portion of this lock
/// typically allows for read-only access (shared access).
///
/// This lock will always prioritize writers over readers to avoid writer
/// starvation. This means that readers trying to acquire the lock will block
/// even if the lock is unlocked when there are writers waiting to acquire the
/// lock.
///
/// The type parameter `T` represents the data that this lock protects. It is
/// required that `T` satisfies `Send` to be shared across threads and `Sync` to
/// allow concurrent access through readers. The RAII guards returned from the
/// locking methods implement `Deref` (and `DerefMut` for the `write` methods)
/// to allow access to the contained of the lock.
///
/// # Differences from the standard library `RwLock`
///
/// - Writer-preferred policy instead of an unspecified platform default.
/// - No poisoning, the lock is released normally on panic.
/// - Only requires 1 word of space, whereas the standard library boxes the
/// `RwLock` due to platform limitations.
/// - A lock guard can be sent to another thread and unlocked there.
/// - Can be statically constructed (requires the `const_fn` nightly feature).
/// - Does not require any drop glue when dropped.
/// - Inline fast path for the uncontended case.
/// - Efficient handling of micro-contention using adaptive spinning.
///
/// # Examples
///
/// ```
/// use parking_lot::RwLock;
///
/// let lock = RwLock::new(5);
///
/// // many reader locks can be held at once
/// {
/// let r1 = lock.read();
/// let r2 = lock.read();
/// assert_eq!(*r1, 5);
/// assert_eq!(*r2, 5);
/// } // read locks are dropped at this point
///
/// // only one write lock may be held, however
/// {
/// let mut w = lock.write();
/// *w += 1;
/// assert_eq!(*w, 6);
/// } // write lock is dropped here
/// ```
pub struct RwLock<T: ?Sized> {
rwlock: RawRwLock,
data: UnsafeCell<T>,
}
unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {}
/// RAII structure used to release the shared read access of a lock when
/// dropped.
#[must_use]
pub struct RwLockReadGuard<'a, T: ?Sized + 'a> {
rwlock: &'a RawRwLock,
data: &'a T,
}
/// RAII structure used to release the exclusive write access of a lock when
/// dropped.
#[must_use]
pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> {
rwlock: &'a RawRwLock,
data: &'a mut T,
}
impl<T> RwLock<T> {
/// Creates a new instance of an `RwLock<T>` which is unlocked.
///
/// # Examples
///
/// ```
/// use parking_lot::RwLock;
///
/// let lock = RwLock::new(5);
/// ```
pub const fn new(val: T) -> RwLock<T> {
RwLock {
data: UnsafeCell::new(val),
rwlock: RawRwLock::new(),
}
}
/// Consumes this `RwLock`, returning the underlying data.
pub fn into_inner(self) -> T {
unsafe { self.data.into_inner() }
}
}
impl<T: ?Sized> RwLock<T> {
/// Locks this rwlock with shared read access, blocking the current thread
/// until it can be acquired.
///
/// The calling thread will be blocked until there are no more writers which
/// hold the lock. There may be other readers currently inside the lock when
/// this method returns. This method does not provide any guarantees with
/// respect to the ordering of whether contentious readers or writers will
/// acquire the lock first.
///
/// Returns an RAII guard which will release this thread's shared access
/// once it is dropped.
pub fn read(&self) -> RwLockReadGuard<T> {
self.rwlock.lock_shared();
RwLockReadGuard {
rwlock: &self.rwlock,
data: unsafe { &*self.data.get() },
}
}
/// Attempts to acquire this rwlock with shared read access.
///
/// If the access could not be granted at this time, then `Err` is returned.
/// Otherwise, an RAII guard is returned which will release the shared access
/// when it is dropped.
///
/// This function does not block.
///
/// This function does not provide any guarantees with respect to the ordering
/// of whether contentious readers or writers will acquire the lock first.
pub fn try_read(&self) -> Option<RwLockReadGuard<T>> {
if self.rwlock.try_lock_shared() {
Some(RwLockReadGuard {
rwlock: &self.rwlock,
data: unsafe { &*self.data.get() },
})
} else {
None
}
}
/// Locks this rwlock with exclusive write access, blocking the current
/// thread until it can be acquired.
///
/// This function will not return while other writers or other readers
/// currently have access to the lock.
///
/// Returns an RAII guard which will drop the write access of this rwlock
/// when dropped.
pub fn write(&self) -> RwLockWriteGuard<T> {
self.rwlock.lock_exclusive();
RwLockWriteGuard {
rwlock: &self.rwlock,
data: unsafe { &mut *self.data.get() },
}
}
/// Attempts to lock this rwlock with exclusive write access.
///
/// If the lock could not be acquired at this time, then `Err` is returned.
/// Otherwise, an RAII guard is returned which will release the lock when
/// it is dropped.
///
/// This function does not block.
///
/// This function does not provide any guarantees with respect to the ordering
/// of whether contentious readers or writers will acquire the lock first.
pub fn try_write(&self) -> Option<RwLockWriteGuard<T>> {
if self.rwlock.try_lock_exclusive() {
Some(RwLockWriteGuard {
rwlock: &self.rwlock,
data: unsafe { &mut *self.data.get() },
})
} else {
None
}
}
/// Returns a mutable reference to the underlying data.
///
/// Since this call borrows the `RwLock` mutably, no actual locking needs to
/// take place---the mutable borrow statically guarantees no locks exist.
pub fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.data.get() }
}
}
impl<T: ?Sized + Default> Default for RwLock<T> {
fn default() -> RwLock<T> {
RwLock::new(Default::default())
}
}
impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLock<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.try_read() {
Some(guard) => write!(f, "RwLock {{ data: {:?} }}", &*guard),
None => write!(f, "RwLock {{ <locked> }}"),
}
}
}
impl<'a, T: ?Sized + 'a> Deref for RwLockReadGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
self.data
}
}
impl<'a, T: ?Sized + 'a> Drop for RwLockReadGuard<'a, T> {
fn drop(&mut self) {
self.rwlock.unlock_shared();
}
}
impl<'a, T: ?Sized + 'a> Deref for RwLockWriteGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
self.data
}
}
impl<'a, T: ?Sized + 'a> DerefMut for RwLockWriteGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
self.data
}
}
impl<'a, T: ?Sized + 'a> Drop for RwLockWriteGuard<'a, T> {
fn drop(&mut self) {
self.rwlock.unlock_exclusive();
}
}
#[cfg(test)]
mod tests {
extern crate rand;
use self::rand::Rng;
use std::sync::mpsc::channel;
use std::thread;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use RwLock;
#[derive(Eq, PartialEq, Debug)]
struct NonCopy(i32);
#[test]
fn smoke() {
let l = RwLock::new(());
drop(l.read());
drop(l.write());
drop((l.read(), l.read()));
drop(l.write());
}
#[test]
fn frob() {
static R: RwLock<()> = RwLock::new(());
const N: u32 = 10;
const M: u32 = 1000;
let (tx, rx) = channel::<()>();
for _ in 0..N {
let tx = tx.clone();
thread::spawn(move || {
let mut rng = rand::thread_rng();
for _ in 0..M {
if rng.gen_weighted_bool(N) {
drop(R.write());
} else {
drop(R.read());
}
}
drop(tx);
});
}
drop(tx);
let _ = rx.recv();
}
#[test]
fn test_rw_arc_no_poison_wr() {
let arc = Arc::new(RwLock::new(1));
let arc2 = arc.clone();
let _: Result<(), _> = thread::spawn(move || {
let _lock = arc2.write();
panic!();
})
.join();
let lock = arc.read();
assert_eq!(*lock, 1);
}
#[test]
fn test_rw_arc_no_poison_ww() {
let arc = Arc::new(RwLock::new(1));
let arc2 = arc.clone();
let _: Result<(), _> = thread::spawn(move || {
let _lock = arc2.write();
panic!();
})
.join();
let lock = arc.write();
assert_eq!(*lock, 1);
}
#[test]
fn test_rw_arc_no_poison_rr() {
let arc = Arc::new(RwLock::new(1));
let arc2 = arc.clone();
let _: Result<(), _> = thread::spawn(move || {
let _lock = arc2.read();
panic!();
})
.join();
let lock = arc.read();
assert_eq!(*lock, 1);
}
#[test]
fn test_rw_arc_no_poison_rw() {
let arc = Arc::new(RwLock::new(1));
let arc2 = arc.clone();
let _: Result<(), _> = thread::spawn(move || {
let _lock = arc2.read();
panic!()
})
.join();
let lock = arc.write();
assert_eq!(*lock, 1);
}
#[test]
fn test_rw_arc() {
let arc = Arc::new(RwLock::new(0));
let arc2 = arc.clone();
let (tx, rx) = channel();
thread::spawn(move || {
let mut lock = arc2.write();
for _ in 0..10 {
let tmp = *lock;
*lock = -1;
thread::yield_now();
*lock = tmp + 1;
}
tx.send(()).unwrap();
});
// Readers try to catch the writer in the act
let mut children = Vec::new();
for _ in 0..5 {
let arc3 = arc.clone();
children.push(thread::spawn(move || {
let lock = arc3.read();
assert!(*lock >= 0);
}));
}
// Wait for children to pass their asserts
for r in children {
assert!(r.join().is_ok());
}
// Wait for writer to finish
rx.recv().unwrap();
let lock = arc.read();
assert_eq!(*lock, 10);
}
#[test]
fn test_rw_arc_access_in_unwind() {
let arc = Arc::new(RwLock::new(1));
let arc2 = arc.clone();
let _ = thread::spawn(move || -> () {
struct Unwinder {
i: Arc<RwLock<isize>>,
}
impl Drop for Unwinder {
fn drop(&mut self) {
let mut lock = self.i.write();
*lock += 1;
}
}
let _u = Unwinder { i: arc2 };
panic!();
})
.join();
let lock = arc.read();
assert_eq!(*lock, 2);
}
#[test]
fn test_rwlock_unsized() {
let rw: &RwLock<[i32]> = &RwLock::new([1, 2, 3]);
{
let b = &mut *rw.write();
b[0] = 4;
b[2] = 5;
}
let comp: &[i32] = &[4, 2, 5];
assert_eq!(&*rw.read(), comp);
}
#[test]
fn test_rwlock_try_write() {
let lock = RwLock::new(0isize);
let read_guard = lock.read();
let write_result = lock.try_write();
match write_result {
None => (),
Some(_) => {
assert!(false,
"try_write should not succeed while read_guard is in scope")
}
}
drop(read_guard);
}
#[test]
fn test_into_inner() {
let m = RwLock::new(NonCopy(10));
assert_eq!(m.into_inner(), NonCopy(10));
}
#[test]
fn test_into_inner_drop() {
struct Foo(Arc<AtomicUsize>);
impl Drop for Foo {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
let num_drops = Arc::new(AtomicUsize::new(0));
let m = RwLock::new(Foo(num_drops.clone()));
assert_eq!(num_drops.load(Ordering::SeqCst), 0);
{
let _inner = m.into_inner();
assert_eq!(num_drops.load(Ordering::SeqCst), 0);
}
assert_eq!(num_drops.load(Ordering::SeqCst), 1);
}
#[test]
fn test_get_mut() {
let mut m = RwLock::new(NonCopy(10));
*m.get_mut() = NonCopy(20);
assert_eq!(m.into_inner(), NonCopy(20));
}
}
+74
View File
@@ -0,0 +1,74 @@
// Copyright 2016 Amanieu d'Antras
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
use std::sync::{Mutex, Condvar};
use std::cell::Cell;
use std::time::Instant;
// Helper type for putting a thread to sleep until some other thread wakes it up
pub struct ThreadParker {
should_park: Cell<bool>,
mutex: Mutex<()>,
condvar: Condvar,
}
impl ThreadParker {
pub fn new() -> ThreadParker {
ThreadParker {
should_park: Cell::new(false),
mutex: Mutex::new(()),
condvar: Condvar::new(),
}
}
// Prepares the parker. This should be called before adding it to the queue.
pub fn prepare_park(&self) {
self.should_park.set(true);
}
// Checks if the park timed out. This should be called while holding the
// queue lock after park_until has returned false.
pub fn timed_out(&self) -> bool {
self.should_park.get()
}
// Parks the thread until it is unparked. This should be called after it has
// been added to the queue, after unlocking the queue.
pub fn park(&self) {
let mut lock = self.mutex.lock().unwrap();
while self.should_park.get() {
lock = self.condvar.wait(lock).unwrap();
}
}
// Parks the thread until it is unparked or the timeout is reached. This
// should be called after it has been added to the queue, after unlocking
// the queue. Returns true if we were unparked and false if we timed out.
pub fn park_until(&self, timeout: Instant) -> bool {
let mut lock = self.mutex.lock().unwrap();
while self.should_park.get() {
let now = Instant::now();
if timeout <= now {
return false;
}
let (new_lock, _) = self.condvar.wait_timeout(lock, timeout - now).unwrap();
lock = new_lock;
}
true
}
// Wakes up the parker. This should be called while holding the queue lock.
pub fn unpark(&self) {
let _lock = self.mutex.lock().unwrap();
self.should_park.set(false);
// We notify while holding the lock here to avoid races with the target
// thread. In particular, the thread could exit after we unlock the
// mutex, which would make the condvar access invalid memory.
self.condvar.notify_one();
}
}
+187
View File
@@ -0,0 +1,187 @@
// Copyright 2016 Amanieu d'Antras
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::ptr;
use std::mem;
use std::cell::Cell;
use thread_parker::ThreadParker;
use SPIN_LIMIT;
struct ThreadData {
parker: ThreadParker,
next_in_queue: Cell<*const ThreadData>,
// To make everything fit in 1 word we cheat by putting the tail pointer of
// the linked list in the first element of the queue.
queue_tail: Cell<*const ThreadData>,
}
impl ThreadData {
fn new() -> ThreadData {
ThreadData {
parker: ThreadParker::new(),
next_in_queue: Cell::new(ptr::null()),
queue_tail: Cell::new(ptr::null()),
}
}
}
thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
const LOCKED_BIT: usize = 1;
const QUEUE_LOCKED_BIT: usize = 2;
const QUEUE_MASK: usize = !3;
// Word-sized lock that is used to implement the parking_lot API. Since this
// can't used parking_lot, it instead manages its own queue of waiting threads.
pub struct WordLock {
state: AtomicUsize,
}
impl WordLock {
#[inline]
pub const fn new() -> WordLock {
WordLock { state: AtomicUsize::new(0) }
}
#[inline]
pub unsafe fn lock(&self) {
if self.state
.compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
.is_ok() {
return;
}
self.lock_slow();
}
#[inline]
pub unsafe fn unlock(&self) {
if self.state
.compare_exchange_weak(LOCKED_BIT, 0, Ordering::Release, Ordering::Relaxed)
.is_ok() {
return;
}
self.unlock_slow();
}
#[inline(never)]
unsafe fn lock_slow(&self) {
let mut spin_count = 0;
let mut state = self.state.load(Ordering::Relaxed);
loop {
// Grab the lock if it isn't locked, even if there is a queue on it
if state & LOCKED_BIT == 0 {
match self.state
.compare_exchange_weak(state,
state | LOCKED_BIT,
Ordering::Acquire,
Ordering::Relaxed) {
Ok(_) => return,
Err(x) => state = x,
}
continue;
}
// If there is no queue, try spinning a few times
if state & QUEUE_MASK == 0 && spin_count < SPIN_LIMIT {
spin_count += 1;
thread::yield_now();
state = self.state.load(Ordering::Relaxed);
continue;
}
// Spin if the queue is locked
if state & QUEUE_LOCKED_BIT != 0 {
thread::yield_now();
state = self.state.load(Ordering::Relaxed);
continue;
}
// Try locking the queue
if let Err(x) = self.state
.compare_exchange_weak(state,
state | QUEUE_LOCKED_BIT,
Ordering::Acquire,
Ordering::Relaxed) {
state = x;
continue;
}
// Get our thread data
THREAD_DATA.with(|thread_data| {
assert!(mem::align_of_val(thread_data) > !QUEUE_MASK);
// Add our thread to the queue and unlock the queue
thread_data.next_in_queue.set(ptr::null());
thread_data.parker.prepare_park();
let mut queue_head = (state & QUEUE_MASK) as *const ThreadData;
if !queue_head.is_null() {
(*(*queue_head).queue_tail.get()).next_in_queue.set(thread_data);
} else {
queue_head = thread_data;
}
(*queue_head).queue_tail.set(thread_data);
self.state.store((queue_head as usize) | LOCKED_BIT, Ordering::Release);
// Sleep until we are woken up by an unlock
thread_data.parker.park();
});
self.state.load(Ordering::Relaxed);
}
}
#[inline(never)]
unsafe fn unlock_slow(&self) {
let queue_head;
let mut state = self.state.load(Ordering::Relaxed);
loop {
// Unlock directly if there is no queue
if state == LOCKED_BIT {
match self.state
.compare_exchange_weak(LOCKED_BIT, 0, Ordering::Release, Ordering::Relaxed) {
Ok(_) => return,
Err(x) => state = x,
}
continue;
}
// Spin if the queue is locked
if state & QUEUE_LOCKED_BIT != 0 {
thread::yield_now();
state = self.state.load(Ordering::Relaxed);
continue;
}
// Try locking the queue
match self.state
.compare_exchange_weak(state,
state | QUEUE_LOCKED_BIT,
Ordering::Acquire,
Ordering::Relaxed) {
Ok(_) => {
queue_head = (state & QUEUE_MASK) as *mut ThreadData;
break;
}
Err(x) => state = x,
}
}
// At this point the queue is locked and must be non-empty. First remove
// the first entry in the queue.
let new_queue_head = (*queue_head).next_in_queue.get();
if !new_queue_head.is_null() {
(*new_queue_head).queue_tail.set((*queue_head).queue_tail.get());
}
self.state.store(new_queue_head as usize, Ordering::Release);
// Then wake up the thread we removed from the queue
(*queue_head).parker.unpark();
}
}