Bug 1844181 - Add memory synchronization to SPSCQueue's thread id reset methods. r=decoder,padenot,handyman

Without this, the necessary synchronization must be provided externally.

This fixes the memory order in the following case of changing producer thread:
- Thread A does SPSCQueue::Enqueue
  - non-atomic write into the ring buffer, at memory location X
  - mWriteIndex.load(relaxed)
  - mWriteIndex.store(release)
- Producer thread is switched to B, no external memory order synchronization is
  provided, but thread B is guaranteed to run after thread A has finished its
  Enqueue task.
- Thread B does SPSCQueue::Enqueue
  - mWriteIndex.load(relaxed)
  - mWriteIndex.store(release)
- Thread C does SPSCQueue::Dequeue
  - mWriteIndex.load(acquire)
  - non-atomic read from the ring buffer, at memory location X

In this scenario, there is no memory synchronization between threads A and B,
and therefore the non-atomic read on C is a data race, and flagged as such by
TSAN.

A similar scenario can be applied to changing the consumer thread, if first A
enqueues, then B dequeues, then C dequeues. However, since Dequeue doesn't
necessarily (MoveOrCopy) do non-atomic writes to the ring buffer, and more
importantly, since Enqueue doesn't do non-atomic reads from the ring buffer,
this is less of a problem.

Differential Revision: https://phabricator.services.mozilla.com/D190084
This commit is contained in:
Andreas Pehrson 2023-10-09 13:02:58 +00:00
parent dbf63dd2c9
commit 76a7426f90
5 changed files with 89 additions and 25 deletions

View File

@ -69,9 +69,6 @@ void AudioInputSource::Start() {
// operations to the task thread.
MOZ_ASSERT(mTaskThread);
// mSPSCQueue will have a new consumer.
mSPSCQueue.ResetConsumerThreadId();
LOG("AudioInputSource %p, start", this);
MOZ_ALWAYS_SUCCEEDS(mTaskThread->Dispatch(
NS_NewRunnableFunction(__func__, [self = RefPtr(this)]() mutable {

View File

@ -694,7 +694,7 @@ void AudioClock::UpdateFrameHistory(uint32_t aServiced, uint32_t aUnderrun,
bool aAudioThreadChanged) {
#ifdef XP_MACOSX
if (aAudioThreadChanged) {
mCallbackInfoQueue.ResetThreadIds();
mCallbackInfoQueue.ResetProducerThreadId();
}
// Flush the local items, if any, and then attempt to enqueue the current
// item. This is only a fallback mechanism, under non-critical load this is

View File

@ -196,7 +196,7 @@ TimeUnit AudioSink::UnplayedDuration() const {
void AudioSink::ReenqueueUnplayedAudioDataIfNeeded() {
// This is OK: the AudioStream has been shut down. ShutDown guarantees that
// the audio callback thread won't call back again.
mProcessedSPSCQueue->ResetThreadIds();
mProcessedSPSCQueue->ResetConsumerThreadId();
// construct an AudioData
int sampleInRingbuffer = mProcessedSPSCQueue->AvailableRead();
@ -355,7 +355,7 @@ uint32_t AudioSink::PopFrames(AudioDataValue* aBuffer, uint32_t aFrames,
// happen when not using cubeb remoting, and often when changing audio device
// at the system level.
if (aAudioThreadChanged) {
mProcessedSPSCQueue->ResetThreadIds();
mProcessedSPSCQueue->ResetConsumerThreadId();
}
TRACE_COMMENT("AudioSink::PopFrames", "%u frames (ringbuffer: %u/%u)",

View File

@ -255,26 +255,39 @@ class SPSCRingBufferBase {
* @return The maximum Capacity of this ring buffer.
*/
int Capacity() const { return StorageCapacity() - 1; }
/**
* Reset the consumer and producer thread identifier, in case the threads are
* being changed. This has to be externally synchronized. This is no-op when
* asserts are disabled.
*/
void ResetThreadIds() {
ResetProducerThreadId();
ResetConsumerThreadId();
}
/**
* Reset the consumer thread id to the current thread. The caller must
* guarantee that the last call to Dequeue() on the previous consumer thread
* has completed, and subsequent calls to Dequeue() will only happen on the
* current thread.
*/
void ResetConsumerThreadId() {
#ifdef DEBUG
mConsumerId = std::thread::id();
mConsumerId = std::this_thread::get_id();
#endif
// When changing consumer from thread A to B, the last Dequeue on A (synced
// by mReadIndex.store with memory_order_release) must be picked up by B
// through an acquire operation.
std::ignore = mReadIndex.load(std::memory_order_acquire);
}
/**
* Reset the producer thread id to the current thread. The caller must
* guarantee that the last call to Enqueue() on the previous consumer thread
* has completed, and subsequent calls to Dequeue() will only happen on the
* current thread.
*/
void ResetProducerThreadId() {
#ifdef DEBUG
mProducerId = std::thread::id();
mProducerId = std::this_thread::get_id();
#endif
// When changing producer from thread A to B, the last Enqueue on A (synced
// by mWriteIndex.store with memory_order_release) must be picked up by B
// through an acquire operation.
std::ignore = mWriteIndex.load(std::memory_order_acquire);
}
private:
@ -364,7 +377,7 @@ class SPSCRingBufferBase {
* called by the right thread.
*
* The role of the thread are assigned the first time they call Enqueue or
* Dequeue, and cannot change, except when ResetThreadIds is called..
* Dequeue, and cannot change, except by a ResetThreadId method.
*
* @param id the id of the thread that has called the calling method first.
*/

View File

@ -161,25 +161,79 @@ const size_t ENQUEUE_SIZE = RING_BUFFER_SIZE / 2;
void TestResetAPI() {
SPSCQueue<float> ring(RING_BUFFER_SIZE);
std::thread t([&ring] {
std::thread p([&ring] {
std::unique_ptr<float[]> inBuffer(new float[ENQUEUE_SIZE]);
int rv = ring.Enqueue(inBuffer.get(), ENQUEUE_SIZE);
MOZ_RELEASE_ASSERT(rv > 0);
});
t.join();
p.join();
ring.ResetThreadIds();
std::thread c([&ring] {
std::unique_ptr<float[]> outBuffer(new float[ENQUEUE_SIZE]);
int rv = ring.Dequeue(outBuffer.get(), ENQUEUE_SIZE);
MOZ_RELEASE_ASSERT(rv > 0);
});
// Enqueue with a different thread. We have reset the thread ID
// in the ring buffer, this should work.
std::thread t2([&ring] {
c.join();
// Enqueue with a different thread. We reset the thread ID in the ring buffer,
// this should work.
std::thread p2([&ring] {
ring.ResetProducerThreadId();
std::unique_ptr<float[]> inBuffer(new float[ENQUEUE_SIZE]);
int rv = ring.Enqueue(inBuffer.get(), ENQUEUE_SIZE);
MOZ_RELEASE_ASSERT(rv > 0);
});
t2.join();
p2.join();
// Dequeue with a different thread. We reset the thread ID in the ring buffer,
// this should work.
std::thread c2([&ring] {
ring.ResetConsumerThreadId();
std::unique_ptr<float[]> outBuffer(new float[ENQUEUE_SIZE]);
int rv = ring.Dequeue(outBuffer.get(), ENQUEUE_SIZE);
MOZ_RELEASE_ASSERT(rv > 0);
});
c2.join();
// Similarly, but do the Enqueues without a Dequeue in between, since a
// Dequeue could affect memory ordering.
std::thread p4;
std::thread p3([&] {
ring.ResetProducerThreadId();
std::unique_ptr<float[]> inBuffer(new float[ENQUEUE_SIZE]);
int rv = ring.Enqueue(inBuffer.get(), ENQUEUE_SIZE);
MOZ_RELEASE_ASSERT(rv > 0);
p4 = std::thread([&ring] {
ring.ResetProducerThreadId();
std::unique_ptr<float[]> inBuffer(new float[ENQUEUE_SIZE]);
int rv = ring.Enqueue(inBuffer.get(), ENQUEUE_SIZE);
MOZ_RELEASE_ASSERT(rv > 0);
});
});
p3.join();
p4.join();
std::thread c4;
std::thread c3([&] {
ring.ResetConsumerThreadId();
std::unique_ptr<float[]> outBuffer(new float[ENQUEUE_SIZE]);
int rv = ring.Dequeue(outBuffer.get(), ENQUEUE_SIZE);
MOZ_RELEASE_ASSERT(rv > 0);
c4 = std::thread([&ring] {
ring.ResetConsumerThreadId();
std::unique_ptr<float[]> outBuffer(new float[ENQUEUE_SIZE]);
int rv = ring.Dequeue(outBuffer.get(), ENQUEUE_SIZE);
MOZ_RELEASE_ASSERT(rv > 0);
});
});
c3.join();
c4.join();
}
void TestMove() {