Bug 1716518 - Upgrade triple_buffer to v5.0.6.

Differential Revision: https://phabricator.services.mozilla.com/D117872

Depends on D117871
This commit is contained in:
Mike Hommey 2021-06-15 09:25:51 +00:00
parent ce7d38e96b
commit 22ec82ac35
7 changed files with 391 additions and 338 deletions

4
Cargo.lock generated
View File

@ -5358,9 +5358,9 @@ checksum = "ce607aae8ab0ab3abf3a2723a9ab6f09bb8639ed83fdd888d857b8e556c868d8"
[[package]]
name = "triple_buffer"
version = "5.0.5"
version = "5.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06577fa2229f6eff69f06ba2e08a27458f32f87a7985abb5047d7bd2e0006512"
checksum = "803966e5a8397a70d3d8111afa1597ba8381346d7de4720e9f539471d371a1a3"
dependencies = [
"cache-padded",
]

View File

@ -1 +1 @@
{"files":{"CHANGELOG.md":"878a0261b1281e00769cbec862fc6bd37a72280099ec1dd5504fdef12745b0b6","Cargo.toml":"0620bdb8ee9a324fa6c49c56802a2ee1a2d5a59b325b635ac314243d82507959","LICENSE":"4b89d4518bd135ab4ee154a7bce722246b57a98c3d7efc1a09409898160c2bd1","README.md":"662e345ee3319bb82a79afb6dd136c5c956007d35b9a2962cd1d743ee321c7c5","src/lib.rs":"eccdd71723a03ffb8a59e9f9740d408512f44f83d839724bfcb0c630149f140d"},"package":"06577fa2229f6eff69f06ba2e08a27458f32f87a7985abb5047d7bd2e0006512"}
{"files":{"CHANGELOG.md":"f5b061b67b5b4b4e2dc6ce129049c9fd5975700ed25a9e91905bedc8374301b8","Cargo.toml":"b0546f2f8310daf982602646f128dca5a695e248cfd30b610fed7dd25efdcd54","LICENSE":"4b89d4518bd135ab4ee154a7bce722246b57a98c3d7efc1a09409898160c2bd1","README.md":"a8a254f626f8903fb7c1446fc95c7b0d6bddfe535e089550f73e43fd7d30d026","benches/benchmarks.rs":"c7592c9d442ac61c34585f0e99e3dde941fe0a707ad1590d4ec4f16ec338f90b","src/lib.rs":"95d7a6c1e0033525dee8d1a0b4431b2a9c023debb18bc78132c5cb37feff19f1"},"package":"803966e5a8397a70d3d8111afa1597ba8381346d7de4720e9f539471d371a1a3"}

View File

@ -11,6 +11,31 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
_No unreleased changes in the pipeline at the moment._
## [5.0.6] - 2021-01-16
### Added
- As a result of the bugfix mentioned below, there is no performance motivation
to gate `raw` features behind a feature flag, so those features are now
available by default without a `raw_` prefix. Usage of the `raw_` prefix and
the `raw` feature flag is deprecated and these may be removed in a future
major release, but it doesn't harm to keep them indefinitely for now.
### Changed
- Benchmarks now use `criterion`, and have been significantly cleaned up along
the way. They are now more extensive and more reliable.
- Moved MSRV to Rust 1.36 because we now use crossbeam for testing, which
requires that much. The crate itself should still support Rust 1.34 for now,
but we cannot test that it continues doing so...
### Fixed
- Removed a possibility of data race that was not observed on current hardware,
but could be triggered by future hardware or compiler evolutions. See
https://github.com/HadrienG2/triple-buffer/issues/14 .
## [5.0.5] - 2020-07-05
### Changed
@ -227,7 +252,8 @@ _No unreleased changes in the pipeline at the moment._
[Unreleased]: https://github.com/HadrienG2/triple-buffer/compare/v5.0.5...HEAD
[Unreleased]: https://github.com/HadrienG2/triple-buffer/compare/v5.0.6...HEAD
[5.0.4]: https://github.com/HadrienG2/triple-buffer/compare/v5.0.5...v5.0.6
[5.0.4]: https://github.com/HadrienG2/triple-buffer/compare/v5.0.4...v5.0.5
[5.0.4]: https://github.com/HadrienG2/triple-buffer/compare/v5.0.3...v5.0.4
[5.0.3]: https://github.com/HadrienG2/triple-buffer/compare/v5.0.2...v5.0.3

View File

@ -13,7 +13,7 @@
[package]
edition = "2018"
name = "triple_buffer"
version = "5.0.5"
version = "5.0.6"
authors = ["Hadrien G. <knights_of_ni@gmx.com>"]
description = "An implementation of triple buffering, useful for sharing frequently updated data between threads"
documentation = "https://docs.rs/triple_buffer/"
@ -22,12 +22,20 @@ keywords = ["synchronization", "spsc", "multithreading", "non-blocking", "wait-f
categories = ["algorithms", "asynchronous", "concurrency", "data-structures"]
license = "MPL-2.0"
repository = "https://github.com/HadrienG2/triple-buffer"
[package.metadata.docs.rs]
all-features = true
[lib]
bench = false
[[bench]]
name = "benchmarks"
harness = false
[dependencies.cache-padded]
version = "1.1"
[dev-dependencies.criterion]
version = "0.3"
[dev-dependencies.testbench]
version = "0.7"
version = "0.8"
[features]
raw = []

View File

@ -3,7 +3,7 @@
[![On crates.io](https://img.shields.io/crates/v/triple_buffer.svg)](https://crates.io/crates/triple_buffer)
[![On docs.rs](https://docs.rs/triple_buffer/badge.svg)](https://docs.rs/triple_buffer/)
[![Continuous Integration](https://github.com/HadrienG2/triple-buffer/workflows/Continuous%20Integration/badge.svg)](https://github.com/HadrienG2/triple-buffer/actions?query=workflow%3A%22Continuous+Integration%22)
![Requires rustc 1.34+](https://img.shields.io/badge/rustc-1.34+-red.svg)
![Requires rustc 1.36+](https://img.shields.io/badge/rustc-1.36+-red.svg)
## What is this?
@ -34,14 +34,10 @@ assert_eq!(*latest_value_ref, 42);
```
In situations where moving the original value away and being unable to
modify it after the fact is too costly, such as if creating a new value
involves dynamic memory allocation, you can opt into the lower-level "raw"
interface, which allows you to access the buffer's data in place and
precisely control when updates are propagated.
This data access method is more error-prone and comes at a small performance
cost, which is why you will need to enable it explicitly using the "raw"
[cargo feature](http://doc.crates.io/manifest.html#usage-in-end-products).
modify it on the consumer's side is too costly, such as if creating a new
value involves dynamic memory allocation, you can use a lower-level API
which allows you to access the producer and consumer's buffers in place
and to precisely control when updates are propagated:
```rust
// Create and split a triple buffer
@ -52,27 +48,27 @@ let (mut buf_input, mut buf_output) = buf.split();
// Mutate the input buffer in place
{
// Acquire a reference to the input buffer
let raw_input = buf_input.raw_input_buffer();
let input = buf_input.input_buffer();
// In general, you don't know what's inside of the buffer, so you should
// always reset the value before use (this is a type-specific process).
raw_input.clear();
input.clear();
// Perform an in-place update
raw_input.push_str("Hello, ");
input.push_str("Hello, ");
}
// Publish the input buffer update
buf_input.raw_publish();
// Publish the above input buffer update
buf_input.publish();
// Manually fetch the buffer update from the consumer interface
buf_output.raw_update();
buf_output.update();
// Acquire a mutable reference to the output buffer
let raw_output = buf_output.raw_output_buffer();
let output = buf_output.output_buffer();
// Post-process the output value before use
raw_output.push_str("world!");
output.push_str("world!");
```
@ -89,7 +85,12 @@ Compared to a mutex:
- Does not allow in-place updates, as the producer and consumer do not access
the same memory location
- Should be slower if updates are rare and in-place updates are much more
efficient than moves, faster otherwise.
efficient than moves, comparable or faster otherwise.
* Mutexes and triple buffering have comparably low overhead on the happy path
(checking a flag), which is systematically taken when updates are rare. In
this scenario, in-place updates can give mutexes a performance advantage.
Where triple buffering shines is when a reader often collides with a writer,
which is handled very poorly by mutexes.
Compared to the read-copy-update (RCU) primitive from the Linux kernel:
@ -97,12 +98,15 @@ Compared to the read-copy-update (RCU) primitive from the Linux kernel:
- Has higher dirty read overhead on relaxed-memory architectures (ARM, POWER...)
- Does not require accounting for reader "grace periods": once the reader has
gotten access to the latest value, the synchronization transaction is over
- Does not use the inefficient compare-and-swap hardware primitive on update
- Does not use the compare-and-swap hardware primitive on update, which is
inefficient by design as it forces its users to retry transactions in a loop.
- Does not suffer from the ABA problem, allowing much simpler code
- Allocates memory on initialization only, rather than on every update
- May use more memory (3x payload + 3x bytes vs 1x pointer + amount of
payloads and refcounts that depends on the readout and update pattern)
- Should be slower if updates are rare, faster if updates are frequent
* The RCU's happy reader path is slightly faster (no flag to check), but its
update procedure is much more involved and costly.
Compared to sending the updates on a message queue:
@ -110,9 +114,15 @@ Compared to sending the updates on a message queue:
other scenarios, although the implementations are much less efficient)
- Consumer only has access to the latest state, not the previous ones
- Consumer does not *need* to get through every previous state
- Is nonblocking AND uses bounded amounts of memory (with queues, it's a choice)
- Is nonblocking AND uses bounded amounts of memory (with queues, it's a choice,
unless you use one of those evil queues that silently drop data when full)
- Can transmit information in a single move, rather than two
- Should be faster for any compatible use case
- Should be faster for any compatible use case.
* Queues force you to move data twice, once in, once out, which will incur a
significant cost for any nontrivial data. If the inner data requires
allocation, they force you to allocate for every transaction. By design,
they force you to store and go through every update, which is not useful
when you're only interested in the latest version of the data.
In short, triple buffering is what you're after in scenarios where a shared
memory location is updated frequently by a single writer, read by a single
@ -135,13 +145,13 @@ I'd like it to be.
First of all, we have sequential tests, which are very thorough but obviously
do not check the lock-free/synchronization part. You run them as follows:
$ cargo test && cargo test --features raw
$ cargo test
Then we have a concurrent test where a reader thread continuously observes the
values from a rate-limited writer thread, and makes sure that he can see every
single update without any incorrect value slipping in the middle.
Then we have concurrent tests where, for example, a reader thread continuously
observes the values from a rate-limited writer thread, and makes sure that he
can see every single update without any incorrect value slipping in the middle.
This test is more important, but it is also harder to run because one must first
These tests are more important, but also harder to run because one must first
check some assumptions:
- The testing host must have at least 2 physical CPU cores to test all possible
@ -149,44 +159,57 @@ check some assumptions:
- No other code should be eating CPU in the background. Including other tests.
- As the proper writing rate is system-dependent, what is configured in this
test may not be appropriate for your machine.
- You must test in release mode, as compiler optimizations tend to create more
opportunities for race conditions.
Taking this and the relatively long run time (~10-20 s) into account, this test
is ignored by default.
Finally, we have benchmarks, which allow you to test how well the code is
performing on your machine. Because cargo bench has not yet landed in Stable
Rust, these benchmarks masquerade as tests, which make them a bit unpleasant to
run. I apologize for the inconvenience.
To run the concurrent test and the benchmarks, make sure no one is eating CPU in
the background and do:
Taking this and the relatively long run time (~10-20 s) into account, the
concurrent tests are ignored by default. To run them, make sure nothing is
eating CPU in the background and do:
$ cargo test --release -- --ignored --nocapture --test-threads=1
(As before, you can also test with `--features raw`)
Finally, we have benchmarks, which allow you to test how well the code is
performing on your machine. We are now using `criterion` for said benchmarks,
which seems that to run them, you can simply do:
Here is a guide to interpreting the benchmark results:
$ cargo bench
* `clean_read` measures the triple buffer readout time when the data has not
changed. It should be extremely fast (a couple of CPU clock cycles).
* `write` measures the amount of time it takes to write data in the triple
buffer when no one is reading.
* `write_and_dirty_read` performs a write as before, immediately followed by a
sequential read. To get the dirty read performance, substract the write time
from that result. Writes and dirty read should take comparable time.
* `concurrent_write` measures the write performance when a reader is
continuously reading. Expect significantly worse performance: lock-free
techniques can help against contention, but are not a panacea.
* `concurrent_read` measures the read performance when a writer is continuously
writing. Again, a significant hit is to be expected.
These benchmarks exercise the worst-case scenario of `u8` payloads, where
synchronization overhead dominates as the cost of reading and writing the
actual data is only 1 cycle. In real-world use cases, you will spend more time
updating buffers and less time synchronizing them.
On an Intel Xeon E5-1620 v3 @ 3.50GHz, typical results are as follows:
However, due to the artificial nature of microbenchmarking, the benchmarks must
exercise two scenarios which are respectively overly optimistic and overly
pessimistic:
* Write: 7.8 ns
* Clean read: 1.8 ns
* Dirty read: 9.3 ns
* Concurrent write: 45 ns
* Concurrent read: 126 ns
1. In uncontended mode, the buffer input and output reside on the same CPU core,
which underestimates the overhead of transferring modified cache lines from
the L1 cache of the source CPU to that of the destination CPU.
* This is not as bad as it sounds, because you will pay this overhead no
matter what kind of thread synchronization primitive you use, so we're not
hiding `triple-buffer` specific overhead here. All you need to do is to
ensure that when comparing against another synchronization primitive, that
primitive is benchmarked in a similar way.
2. In contended mode, the benchmarked half of the triple buffer is operating
under maximal load from the other half, which is much more busy than what is
actually going to be observed in real-world workloads.
* In this configuration, what you're essentially measuring is the performance
of your CPU's cache line locking protocol and inter-CPU core data
transfers under the shared data access pattern of `triple-buffer`.
Therefore, consider these benchmarks' timings as orders of magnitude of the best
and the worst that you can expect from `triple-buffer`, where actual performance
will be somewhere inbetween these two numbers depending on your workload.
On an Intel Core i3-3220 CPU @ 3.30GHz, typical results are as follows:
* Clean read: 0.9 ns
* Write: 6.9 ns
* Write + dirty read: 19.6 ns
* Dirty read (estimated): 12.7 ns
* Contended write: 60.8 ns
* Contended read: 59.2 ns
## License

View File

@ -0,0 +1,80 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use triple_buffer::TripleBuffer;
pub fn benchmark(c: &mut Criterion) {
let (mut input, mut output) = TripleBuffer::<u8>::default().split();
{
let mut uncontended = c.benchmark_group("uncontended");
uncontended.bench_function("read output", |b| b.iter(|| *output.output_buffer()));
uncontended.bench_function("clean update", |b| {
b.iter(|| {
output.update();
})
});
uncontended.bench_function("clean receive", |b| b.iter(|| *output.read()));
uncontended.bench_function("write input", |b| {
b.iter(|| {
*input.input_buffer() = black_box(0);
})
});
uncontended.bench_function("publish", |b| {
b.iter(|| {
input.publish();
})
});
uncontended.bench_function("send", |b| b.iter(|| input.write(black_box(0))));
uncontended.bench_function("publish + dirty update", |b| {
b.iter(|| {
input.publish();
output.update();
})
});
uncontended.bench_function("transmit", |b| {
b.iter(|| {
input.write(black_box(0));
*output.read()
})
});
}
{
let mut read_contended = c.benchmark_group("read contention");
testbench::run_under_contention(
|| black_box(*output.read()),
|| {
read_contended.bench_function("write input", |b| {
b.iter(|| {
*input.input_buffer() = black_box(0);
})
});
read_contended.bench_function("publish", |b| {
b.iter(|| {
input.publish();
})
});
read_contended.bench_function("send", |b| b.iter(|| input.write(black_box(0))));
},
);
}
{
let mut write_contended = c.benchmark_group("write contention");
testbench::run_under_contention(
|| input.write(black_box(0)),
|| {
write_contended
.bench_function("read output", |b| b.iter(|| *output.output_buffer()));
write_contended.bench_function("update", |b| {
b.iter(|| {
output.update();
})
});
write_contended.bench_function("receive", |b| b.iter(|| *output.read()));
},
);
}
}
criterion_group!(benches, benchmark);
criterion_main!(benches);

View File

@ -8,9 +8,9 @@
//!
//! # Examples
//!
//! For many use cases, you can use the default interface, designed for maximal
//! ergonomics and synchronization performance, which is based on moving values
//! into the buffer and subsequently accessing them via shared references:
//! For many use cases, you can use the ergonomic write/read interface, where
//! the producer moves values into the buffer and the consumer accesses the
//! latest buffer by shared reference:
//!
//! ```
//! // Create a triple buffer
@ -30,18 +30,12 @@
//! ```
//!
//! In situations where moving the original value away and being unable to
//! modify it after the fact is too costly, such as if creating a new value
//! involves dynamic memory allocation, you can opt into the lower-level "raw"
//! interface, which allows you to access the buffer's data in place and
//! precisely control when updates are propagated.
//!
//! This data access method is more error-prone and comes at a small performance
//! cost, which is why you will need to enable it explicitly using the "raw"
//! [cargo feature](http://doc.crates.io/manifest.html#usage-in-end-products).
//! modify it on the consumer's side is too costly, such as if creating a new
//! value involves dynamic memory allocation, you can use a lower-level API
//! which allows you to access the producer and consumer's buffers in place
//! and to precisely control when updates are propagated:
//!
//! ```
//! # #[cfg(feature = "raw")]
//! # {
//! // Create and split a triple buffer
//! use triple_buffer::TripleBuffer;
//! let buf = TripleBuffer::new(String::with_capacity(42));
@ -50,28 +44,27 @@
//! // Mutate the input buffer in place
//! {
//! // Acquire a reference to the input buffer
//! let raw_input = buf_input.raw_input_buffer();
//! let input = buf_input.input_buffer();
//!
//! // In general, you don't know what's inside of the buffer, so you should
//! // always reset the value before use (this is a type-specific process).
//! raw_input.clear();
//! input.clear();
//!
//! // Perform an in-place update
//! raw_input.push_str("Hello, ");
//! input.push_str("Hello, ");
//! }
//!
//! // Publish the input buffer update
//! buf_input.raw_publish();
//! // Publish the above input buffer update
//! buf_input.publish();
//!
//! // Manually fetch the buffer update from the consumer interface
//! buf_output.raw_update();
//! buf_output.update();
//!
//! // Acquire a mutable reference to the output buffer
//! let raw_output = buf_output.raw_output_buffer();
//! let output = buf_output.output_buffer();
//!
//! // Post-process the output value before use
//! raw_output.push_str("world!");
//! # }
//! output.push_str("world!");
//! ```
#![deny(missing_debug_implementations, missing_docs)]
@ -93,10 +86,6 @@ use std::{
/// submits regular updates, and the consumer accesses the latest available
/// value whenever it feels like it.
///
/// The input and output fields of this struct are what producers and consumers
/// actually use in practice. They can safely be moved away from the
/// TripleBuffer struct after construction, and are further documented below.
///
#[derive(Debug)]
pub struct TripleBuffer<T: Send> {
/// Input object used by producers to send updates
@ -108,6 +97,11 @@ pub struct TripleBuffer<T: Send> {
//
impl<T: Clone + Send> TripleBuffer<T> {
/// Construct a triple buffer with a certain initial value
//
// FIXME: After spending some time thinking about this further, I reached
// the conclusion that clippy was right after all. But since this is
// a breaking change, I'm keeping that for the next major release.
//
#[allow(clippy::needless_pass_by_value)]
pub fn new(initial: T) -> Self {
Self::new_impl(|| initial.clone())
@ -141,12 +135,22 @@ impl<T: Send> TripleBuffer<T> {
}
/// Extract input and output of the triple buffer
//
// NOTE: Although it would be nicer to directly return `Input` and `Output`
// from `new()`, the `split()` design gives some API evolution
// headroom towards future allocation-free modes of operation where
// the SharedState is a static variable, or a stack-allocated variable
// used through scoped threads or other unsafe thread synchronization.
//
// See https://github.com/HadrienG2/triple-buffer/issues/8 .
//
pub fn split(self) -> (Input<T>, Output<T>) {
(self.input, self.output)
}
}
//
// The Clone and PartialEq traits are used internally for testing.
// The Clone and PartialEq traits are used internally for testing and I don't
// want to commit to supporting them publicly for now.
//
#[doc(hidden)]
impl<T: Clone + Send> Clone for TripleBuffer<T> {
@ -223,77 +227,68 @@ impl<T: Send> Input<T> {
back_info & BACK_DIRTY_BIT == 0
}
/// Get raw access to the input buffer
/// Access the input buffer directly
///
/// This advanced interface allows you to update the input buffer in place,
/// which can in some case improve performance by avoiding to create values
/// of type T repeatedy when this is an expensive process.
/// so that you can avoid creating values of type T repeatedy just to push
/// them into the triple buffer when doing so is expensive.
///
/// However, by opting into it, you force yourself to take into account
/// subtle implementation details which you could normally ignore.
/// However, by using it, you force yourself to take into account some
/// implementation subtleties that you could normally ignore.
///
/// First, the buffer does not contain the last value that you sent (which
/// is now into the hands of the consumer). In fact, the consumer is allowed
/// to write complete garbage into it if it feels so inclined. All you can
/// safely assume is that it contains a valid value of type T.
/// First, the buffer does not contain the last value that you published
/// (which is now available to the consumer thread). In fact, what you get
/// may not match _any_ value that you sent in the past, but rather be a new
/// value that was written in there by the consumer thread. All you can
/// safely assume is that the buffer contains a valid value of type T, which
/// you may need to "clean up" before use using a type-specific process.
///
/// Second, we do not send updates automatically. You need to call
/// raw_publish() in order to propagate a buffer update to the consumer.
/// Alternative designs based on Drop were considered, but ultimately deemed
/// too magical for the target audience of this method.
/// `publish()` in order to propagate a buffer update to the consumer.
/// Alternative designs based on Drop were considered, but considered too
/// magical for the target audience of this interface.
///
/// To use this method, you have to enable the crate's `raw` feature
#[cfg(any(feature = "raw", test))]
pub fn raw_input_buffer(&mut self) -> &mut T {
self.input_buffer()
}
/// Unconditionally publish an update, checking for overwrites
///
/// After updating the input buffer using raw_input_buffer(), you can use
/// this method to publish your updates to the consumer. It will send back
/// an output flag which tells you whether an unread value was overwritten.
///
/// To use this method, you have to enable the crate's `raw` feature
#[cfg(any(feature = "raw", test))]
pub fn raw_publish(&mut self) -> bool {
self.publish()
}
}
//
// Internal interface
impl<T: Send> Input<T> {
/// Access the input buffer
///
/// This is safe because the synchronization protocol ensures that we have
/// exclusive access to this buffer.
///
fn input_buffer(&mut self) -> &mut T {
pub fn input_buffer(&mut self) -> &mut T {
// This is safe because the synchronization protocol ensures that we
// have exclusive access to this buffer.
let input_ptr = self.shared.buffers[self.input_idx as usize].get();
unsafe { &mut *input_ptr }
}
/// Tell which memory ordering should be used for buffer swaps
/// Publish the current input buffer, checking for overwrites
///
/// The right answer depends on whether the consumer is allowed to write
/// into the output buffer or not. If it can, then we must synchronize with
/// its writes. If not, we only need to propagate our own writes.
/// After updating the input buffer using `input_buffer()`, you can use this
/// method to publish your updates to the consumer.
///
fn swap_ordering() -> Ordering {
if cfg!(feature = "raw") {
Ordering::AcqRel
} else {
Ordering::Release
}
}
/// Publish an update, checking for overwrites (internal version)
fn publish(&mut self) -> bool {
/// This will replace the current input buffer with another one, as you
/// cannot continue using the old one while the consumer is accessing it.
///
/// It will also tell you whether you overwrote a value which was not read
/// by the consumer thread.
///
pub fn publish(&mut self) -> bool {
// Swap the input buffer and the back buffer, setting the dirty bit
let former_back_info = self.shared.back_info.swap(
self.input_idx | BACK_DIRTY_BIT,
Self::swap_ordering(), // Propagate buffer updates as well
);
//
// The ordering must be AcqRel, because...
//
// - Our accesses to the old buffer must not be reordered after this
// operation (which mandates Release ordering), otherwise they could
// race with the consumer accessing the freshly published buffer.
// - Our accesses from the buffer must not be reordered before this
// operation (which mandates Consume ordering, that is best
// approximated by Acquire in Rust), otherwise they would race with
// the consumer accessing the buffer as well before switching to
// another buffer.
// * This reordering may seem paradoxical, but could happen if the
// compiler or CPU correctly speculated the new buffer's index
// before that index is actually read, as well as on weird hardware
// with incoherent caches like GPUs or old DEC Alpha where keeping
// data in sync across cores requires manual action.
//
let former_back_info = self
.shared
.back_info
.swap(self.input_idx | BACK_DIRTY_BIT, Ordering::AcqRel);
// The old back buffer becomes our new input buffer
self.input_idx = former_back_info & BACK_INDEX_MASK;
@ -301,6 +296,30 @@ impl<T: Send> Input<T> {
// Tell whether we have overwritten unread data
former_back_info & BACK_DIRTY_BIT != 0
}
/// Deprecated alias to `input_buffer()`, please use that method instead
#[cfg(any(feature = "raw", test))]
#[deprecated(
since = "5.0.5",
note = "The \"raw\" feature is deprecated as the performance \
optimization that motivated it turned out to be incorrect. \
All functionality is now available without using feature flags."
)]
pub fn raw_input_buffer(&mut self) -> &mut T {
self.input_buffer()
}
/// Deprecated alias to `publish()`, please use that method instead
#[cfg(any(feature = "raw", test))]
#[deprecated(
since = "5.0.5",
note = "The \"raw\" feature is deprecated as the performance \
optimization that motivated it turned out to be incorrect. \
All functionality is now available without using feature flags."
)]
pub fn raw_publish(&mut self) -> bool {
self.publish()
}
}
/// Consumer interface to the triple buffer
@ -343,73 +362,41 @@ impl<T: Send> Output<T> {
back_info & BACK_DIRTY_BIT != 0
}
/// Get raw access to the output buffer
/// Access the output buffer directly
///
/// This advanced interface allows you to modify the contents of the output
/// buffer, which can in some case improve performance by avoiding to create
/// values of type T when this is an expensive process. One possible
/// application, for example, is to post-process values from the producer.
/// buffer, so that you can avoid copying the output value when this is an
/// expensive process. One possible application, for example, is to
/// post-process values from the producer before use.
///
/// However, by opting into it, you force yourself to take into account
/// subtle implementation details which you could normally ignore.
/// However, by using it, you force yourself to take into account some
/// implementation subtleties that you could normally ignore.
///
/// First, keep in mind that you can lose access to the current output
/// buffer any time read() or raw_update() is called, as it will be replaced
/// buffer any time `read()` or `update()` is called, as it may be replaced
/// by an updated buffer from the producer automatically.
///
/// Second, to reduce the potential for the aforementioned usage error, this
/// method does not update the output buffer automatically. You need to call
/// raw_update() in order to fetch buffer updates from the producer.
/// `update()` in order to fetch buffer updates from the producer.
///
/// To use this method, you have to enable the crate's `raw` feature
#[cfg(any(feature = "raw", test))]
pub fn raw_output_buffer(&mut self) -> &mut T {
self.output_buffer()
}
/// Update the output buffer
///
/// Check for incoming updates from the producer, and if so, update our
/// output buffer to the latest data version. This operation will overwrite
/// any changes which you may have commited into the output buffer.
///
/// Return a flag telling whether an update was carried out
///
/// To use this method, you have to enable the crate's `raw` feature
#[cfg(any(feature = "raw", test))]
pub fn raw_update(&mut self) -> bool {
self.update()
}
}
//
// Internal interface
impl<T: Send> Output<T> {
/// Access the output buffer (internal version)
///
/// This is safe because the synchronization protocol ensures that we have
/// exclusive access to this buffer.
///
fn output_buffer(&mut self) -> &mut T {
pub fn output_buffer(&mut self) -> &mut T {
// This is safe because the synchronization protocol ensures that we
// have exclusive access to this buffer.
let output_ptr = self.shared.buffers[self.output_idx as usize].get();
unsafe { &mut *output_ptr }
}
/// Tell which memory ordering should be used for buffer swaps
/// Update the output buffer
///
/// The right answer depends on whether the client is allowed to write into
/// the output buffer or not. If it can, then we must propagate these writes
/// back to the producer. Otherwise, we only need to fetch producer updates.
/// Check if the producer submitted a new data version, and if one is
/// available, update our output buffer to use it. Return a flag that tells
/// you whether such an update was carried out.
///
fn swap_ordering() -> Ordering {
if cfg!(feature = "raw") {
Ordering::AcqRel
} else {
Ordering::Acquire
}
}
/// Check out incoming output buffer updates (internal version)
fn update(&mut self) -> bool {
/// Bear in mind that when this happens, you will lose any change that you
/// performed to the output buffer via the `output_buffer()` interface.
///
pub fn update(&mut self) -> bool {
// Access the shared state
let shared_state = &(*self.shared);
@ -419,10 +406,25 @@ impl<T: Send> Output<T> {
// If so, exchange our output buffer with the back-buffer, thusly
// acquiring exclusive access to the old back buffer while giving
// the producer a new back-buffer to write to.
let former_back_info = shared_state.back_info.swap(
self.output_idx,
Self::swap_ordering(), // Synchronize with buffer updates
);
//
// The ordering must be AcqRel, because...
//
// - Our accesses to the previous buffer must not be reordered after
// this operation (which mandates Release ordering), otherwise
// they could race with the producer accessing the freshly
// liberated buffer.
// - Our accesses from the buffer must not be reordered before this
// operation (which mandates Consume ordering, that is best
// approximated by Acquire in Rust), otherwise they would race
// with the producer writing into the buffer before publishing it.
// * This reordering may seem paradoxical, but could happen if the
// compiler or CPU correctly speculated the new buffer's index
// before that index is actually read, as well as on weird hardware
// like GPUs where CPU caches require manual synchronization.
//
let former_back_info = shared_state
.back_info
.swap(self.output_idx, Ordering::AcqRel);
// Make the old back-buffer our new output buffer
self.output_idx = former_back_info & BACK_INDEX_MASK;
@ -431,6 +433,30 @@ impl<T: Send> Output<T> {
// Tell whether an update was carried out
updated
}
/// Deprecated alias to `output_buffer()`, please use that method instead
#[cfg(any(feature = "raw", test))]
#[deprecated(
since = "5.0.5",
note = "The \"raw\" feature is deprecated as the performance \
optimization that motivated it turned out to be incorrect. \
All functionality is now available without using feature flags."
)]
pub fn raw_output_buffer(&mut self) -> &mut T {
self.output_buffer()
}
/// Deprecated alias to `update()`, please use that method instead
#[cfg(any(feature = "raw", test))]
#[deprecated(
since = "5.0.5",
note = "The \"raw\" feature is deprecated as the performance \
optimization that motivated it turned out to be incorrect. \
All functionality is now available without using feature flags."
)]
#[cfg(any(feature = "raw", test))]
pub fn raw_update(&mut self) -> bool {
self.update()
}
}
/// Triple buffer shared state
@ -500,13 +526,13 @@ impl<T: PartialEq + Send> SharedState<T> {
//
unsafe impl<T: Send> Sync for SharedState<T> {}
/// Index types used for triple buffering
///
/// These types are used to index into triple buffers. In addition, the
/// BackBufferInfo type is actually a bitfield, whose third bit (numerical
/// value: 4) is set to 1 to indicate that the producer published an update into
/// the back-buffer, and reset to 0 when the consumer fetches the update.
///
// Index types used for triple buffering
//
// These types are used to index into triple buffers. In addition, the
// BackBufferInfo type is actually a bitfield, whose third bit (numerical
// value: 4) is set to 1 to indicate that the producer published an update into
// the back-buffer, and reset to 0 when the consumer fetches the update.
//
type BufferIndex = u8;
type BackBufferInfo = BufferIndex;
//
@ -670,12 +696,12 @@ mod tests {
let old_output_idx = old_buf.output.output_idx;
// Check that updating from a clean state works
assert!(!buf.output.raw_update());
assert!(!buf.output.update());
assert_eq!(buf, old_buf);
check_buf_state(&mut buf, false);
// Check that publishing from a clean state works
assert!(!buf.input.raw_publish());
assert!(!buf.input.publish());
let mut expected_buf = old_buf.clone();
expected_buf.input.input_idx = old_back_idx;
expected_buf
@ -687,7 +713,7 @@ mod tests {
check_buf_state(&mut buf, true);
// Check that overwriting a dirty state works
assert!(buf.input.raw_publish());
assert!(buf.input.publish());
let mut expected_buf = old_buf.clone();
expected_buf.input.input_idx = old_input_idx;
expected_buf
@ -699,7 +725,7 @@ mod tests {
check_buf_state(&mut buf, true);
// Check that updating from a dirty state works
assert!(buf.output.raw_update());
assert!(buf.output.update());
expected_buf.output.output_idx = old_back_idx;
expected_buf
.output
@ -728,8 +754,8 @@ mod tests {
let mut expected_buf = old_buf.clone();
// ...write the new value in and swap...
*expected_buf.input.raw_input_buffer() = true;
expected_buf.input.raw_publish();
*expected_buf.input.input_buffer() = true;
expected_buf.input.publish();
// Nothing else should have changed
assert_eq!(buf, expected_buf);
@ -757,7 +783,7 @@ mod tests {
// Result should be equivalent to carrying out an update
let mut expected_buf = old_buf.clone();
assert!(expected_buf.output.raw_update());
assert!(expected_buf.output.update());
assert_eq!(buf, expected_buf);
check_buf_state(&mut buf, false);
}
@ -865,13 +891,12 @@ mod tests {
);
}
/// When raw mode is enabled, the consumer is allowed to modify its bufffer,
/// which means that it will unknowingly send back data to the producer.
/// This creates new correctness requirements for the synchronization
/// protocol, which must be checked as well.
/// Through the low-level API, the consumer is allowed to modify its
/// bufffer, which means that it will unknowingly send back data to the
/// producer. This creates new correctness requirements for the
/// synchronization protocol, which must be checked as well.
#[test]
#[ignore]
#[cfg(feature = "raw")]
fn concurrent_bidirectional_exchange() {
// We will stress the infrastructure by performing this many writes
// as a reader continuously reads the latest value
@ -886,9 +911,9 @@ mod tests {
testbench::concurrent_test_2(
move || {
for new_value in 1..=TEST_WRITE_COUNT {
match buf_input.raw_input_buffer().get() {
match buf_input.input_buffer().get() {
Racey::Consistent(curr_value) => {
assert!(curr_value <= TEST_WRITE_COUNT);
assert!(curr_value <= new_value);
}
Racey::Inconsistent => {
panic!("Inconsistent state exposed by the buffer!");
@ -900,7 +925,7 @@ mod tests {
move || {
let mut last_value = 0usize;
while last_value < TEST_WRITE_COUNT {
match buf_output.raw_output_buffer().get() {
match buf_output.output_buffer().get() {
Racey::Consistent(new_value) => {
assert!((new_value >= last_value) && (new_value <= TEST_WRITE_COUNT));
last_value = new_value;
@ -910,8 +935,8 @@ mod tests {
}
}
if buf_output.updated() {
buf_output.raw_output_buffer().set(last_value / 2);
buf_output.raw_update();
buf_output.output_buffer().set(last_value / 2);
buf_output.update();
}
}
},
@ -960,7 +985,7 @@ mod tests {
// Check that the "input buffer" query behaves as expected
assert_eq!(
as_ptr(&buf.input.raw_input_buffer()),
as_ptr(&buf.input.input_buffer()),
buf.input.shared.buffers[buf.input.input_idx as usize].get()
);
assert_eq!(*buf, initial_buf);
@ -971,7 +996,7 @@ mod tests {
// Check that the output_buffer query works in the initial state
assert_eq!(
as_ptr(&buf.output.raw_output_buffer()),
as_ptr(&buf.output.output_buffer()),
buf.output.shared.buffers[buf.output.output_idx as usize].get()
);
assert_eq!(*buf, initial_buf);
@ -981,112 +1006,3 @@ mod tests {
assert_eq!(*buf, initial_buf);
}
}
/// Performance benchmarks
///
/// These benchmarks masquerading as tests are a stopgap solution until
/// benchmarking lands in Stable Rust. They should be compiled in release mode,
/// and run with only one OS thread. In addition, the default behaviour of
/// swallowing test output should obviously be suppressed.
///
/// TL;DR: cargo test --release -- --ignored --nocapture --test-threads=1
///
/// TODO: Switch to standard Rust benchmarks once they are stable
///
#[cfg(test)]
mod benchmarks {
use super::TripleBuffer;
use testbench;
/// Benchmark for clean read performance
#[test]
#[ignore]
fn clean_read() {
// Create a buffer
let mut buf = TripleBuffer::new(0u32);
// Benchmark clean reads
testbench::benchmark(2_500_000_000, || {
let read = *buf.output.read();
assert!(read < u32::max_value());
});
}
/// Benchmark for write performance
#[test]
#[ignore]
fn write() {
// Create a buffer
let mut buf = TripleBuffer::new(0u32);
// Benchmark writes
let mut iter = 1u32;
testbench::benchmark(640_000_000, || {
buf.input.write(iter);
iter += 1;
});
}
/// Benchmark for write + dirty read performance
#[test]
#[ignore]
fn write_and_dirty_read() {
// Create a buffer
let mut buf = TripleBuffer::new(0u32);
// Benchmark writes + dirty reads
let mut iter = 1u32;
testbench::benchmark(290_000_000u32, || {
buf.input.write(iter);
iter += 1;
let read = *buf.output.read();
assert!(read < u32::max_value());
});
}
/// Benchmark read performance under concurrent write pressure
#[test]
#[ignore]
fn concurrent_read() {
// Create a buffer
let buf = TripleBuffer::new(0u32);
let (mut buf_input, mut buf_output) = buf.split();
// Benchmark reads under concurrent write pressure
let mut counter = 0u32;
testbench::concurrent_benchmark(
56_000_000u32,
move || {
let read = *buf_output.read();
assert!(read < u32::max_value());
},
move || {
buf_input.write(counter);
counter = (counter + 1) % u32::max_value();
},
);
}
/// Benchmark write performance under concurrent read pressure
#[test]
#[ignore]
fn concurrent_write() {
// Create a buffer
let buf = TripleBuffer::new(0u32);
let (mut buf_input, mut buf_output) = buf.split();
// Benchmark writes under concurrent read pressure
let mut iter = 1u32;
testbench::concurrent_benchmark(
88_000_000u32,
move || {
buf_input.write(iter);
iter += 1;
},
move || {
let read = *buf_output.read();
assert!(read < u32::max_value());
},
);
}
}