From 3b72f1fb2fae677eff2f634f2979df25c9957b8f Mon Sep 17 00:00:00 2001 From: Paul Adenot Date: Fri, 12 Jun 2020 13:12:55 +0000 Subject: [PATCH] Bug 1626918 - Split the MPSC queue in its own file, and generalize it for any type. r=achronop The allocation still needs to be a power of two so the ergonomics are not amazing, but this will do until we replace with a buffer-based MPSC ringbuffer. Differential Revision: https://phabricator.services.mozilla.com/D78499 --- dom/media/AsyncLogger.h | 149 ---------------------------------------- dom/media/MPSCQueue.h | 148 +++++++++++++++++++++++++++++++++++++++ dom/media/moz.build | 1 + 3 files changed, 149 insertions(+), 149 deletions(-) create mode 100644 dom/media/MPSCQueue.h diff --git a/dom/media/AsyncLogger.h b/dom/media/AsyncLogger.h index 763004b9e2e9..51e0084400e1 100644 --- a/dom/media/AsyncLogger.h +++ b/dom/media/AsyncLogger.h @@ -16,155 +16,6 @@ namespace mozilla { -namespace detail { - -// This class implements a lock-free multiple producer single consumer queue of -// fixed size log messages, with the following characteristics: -// - Unbounded (uses a intrinsic linked list) -// - Allocates on Push. Push can be called on any thread. -// - Deallocates on Pop. Pop MUST always be called on the same thread for the -// life-time of the queue. -// -// In our scenario, the producer threads are real-time, they can't block. The -// consummer thread runs every now and then and empties the queue to a log -// file, on disk. -// -// Having fixed size messages and jemalloc is probably not the fastest, but -// allows having a simpler design, we count on the fact that jemalloc will get -// the memory from a thread-local source most of the time. -template -class MPSCQueue { - public: - struct Message { - Message() { mNext.store(nullptr, std::memory_order_relaxed); } - Message(const Message& aMessage) = delete; - void operator=(const Message& aMessage) = delete; - - char data[MESSAGE_LENGTH]; - std::atomic mNext; - }; - // Creates a new MPSCQueue. Initially, the queue has a single sentinel node, - // pointed to by both mHead and mTail. - MPSCQueue() - // At construction, the initial message points to nullptr (it has no - // successor). It is a sentinel node, that does not contain meaningful - // data. - : mHead(new Message()), mTail(mHead.load(std::memory_order_relaxed)) {} - - ~MPSCQueue() { - Message dummy; - while (this->Pop(dummy.data)) { - } - Message* front = mHead.load(std::memory_order_relaxed); - delete front; - } - - void Push(MPSCQueue::Message* aMessage) { - // The next two non-commented line are called A and B in this paragraph. - // Producer threads i, i-1, etc. are numbered in the order they reached - // A in time, thread i being the thread that has reached A first. - // Atomically, on line A the new `mHead` is set to be the node that was - // just allocated, with strong memory order. From now one, any thread - // that reaches A will see that the node just allocated is - // effectively the head of the list, and will make itself the new head - // of the list. - // In a bad case (when thread i executes A and then - // is not scheduled for a long time), it is possible that thread i-1 and - // subsequent threads create a seemingly disconnected set of nodes, but - // they all have the correct value for the next node to set as their - // mNext member on their respective stacks (in `prev`), and this is - // always correct. When the scheduler resumes, and line B is executed, - // the correct linkage is resumed. - // Before line B, since mNext for the node the was the last element of - // the queue still has an mNext of nullptr, Pop will not see the node - // added. - // For line A, it's critical to have strong ordering both ways (since - // it's going to possibly be read and write repeatidly by multiple - // threads) - // Line B can have weaker guarantees, it's only going to be written by a - // single thread, and we just need to ensure it's read properly by a - // single other one. - Message* prev = mHead.exchange(aMessage, std::memory_order_acq_rel); - prev->mNext.store(aMessage, std::memory_order_release); - } - - // Allocates a new node, copy aInput to the new memory location, and pushes - // it to the end of the list. - void Push(const char aInput[MESSAGE_LENGTH]) { - // Create a new message, and copy the messages passed on argument to the - // new memory location. We are not touching the queue right now. The - // successor for this new node is set to be nullptr. - Message* msg = new Message(); - strncpy(msg->data, aInput, MESSAGE_LENGTH); - - Push(msg); - } - - // Copy the content of the first message of the queue to aOutput, and - // frees the message. Returns true if there was a message, in which case - // `aOutput` contains a valid value. If the queue was empty, returns false, - // in which case `aOutput` is left untouched. - bool Pop(char aOutput[MESSAGE_LENGTH]) { - // Similarly, in this paragraph, the two following lines are called A - // and B, and threads are called thread i, i-1, etc. in order of - // execution of line A. - // On line A, the first element of the queue is acquired. It is simply a - // sentinel node. - // On line B, we acquire the node that has the data we want. If B is - // null, then only the sentinel node was present in the queue, we can - // safely return false. - // mTail can be loaded with relaxed ordering, since it's not written nor - // read by any other thread (this queue is single consumer). - // mNext can be written to by one of the producer, so it's necessary to - // ensure those writes are seen, hence the stricter ordering. - Message* tail = mTail.load(std::memory_order_relaxed); - Message* next = tail->mNext.load(std::memory_order_acquire); - - if (next == nullptr) { - return false; - } - - strncpy(aOutput, next->data, MESSAGE_LENGTH); - - // Simply shift the queue one node further, so that the sentinel node is - // now pointing to the correct most ancient node. It contains stale data, - // but this data will never be read again. - // It's only necessary to ensure the previous load on this thread is not - // reordered past this line, so release ordering is sufficient here. - mTail.store(next, std::memory_order_release); - - // This thread is now the only thing that points to `tail`, it can be - // safely deleted. - delete tail; - - return true; - } - - private: - // An atomic pointer to the most recent message in the queue. - std::atomic mHead; - // An atomic pointer to a sentinel node, that points to the oldest message - // in the queue. - std::atomic mTail; - - MPSCQueue(const MPSCQueue&) = delete; - void operator=(const MPSCQueue&) = delete; - - public: - // The goal here is to make it easy on the allocator. We pack a pointer in the - // message struct, and we still want to do power of two allocations to - // minimize allocator slop. The allocation size are going to be constant, so - // the allocation is probably going to hit the thread local cache in jemalloc, - // making it cheap and, more importantly, lock-free enough. - static const size_t MESSAGE_PADDING = sizeof(Message::mNext); - - private: - static_assert(IsPowerOfTwo(MESSAGE_LENGTH + MESSAGE_PADDING), - "MPSCQueue internal allocations must have a size that is a" - "power of two "); -}; -} // end namespace detail - // This class implements a lock-free asynchronous logger, that outputs to // MOZ_LOG. // Any thread can use this logger without external synchronization and without diff --git a/dom/media/MPSCQueue.h b/dom/media/MPSCQueue.h new file mode 100644 index 000000000000..67641f45fa4a --- /dev/null +++ b/dom/media/MPSCQueue.h @@ -0,0 +1,148 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#ifndef mozilla_dom_MPSCQueue_h +#define mozilla_dom_MPSCQueue_h + +namespace mozilla { + +// This class implements a lock-free multiple producer single consumer queue of +// fixed size log messages, with the following characteristics: +// - Unbounded (uses a intrinsic linked list) +// - Allocates on Push. Push can be called on any thread. +// - Deallocates on Pop. Pop MUST always be called on the same thread for the +// life-time of the queue. +// +// In our scenario, the producer threads are real-time, they can't block. The +// consummer thread runs every now and then and empties the queue to a log +// file, on disk. +// +// Having fixed size messages and jemalloc is probably not the fastest, but +// allows having a simpler design, we count on the fact that jemalloc will get +// the memory from a thread-local source most of the time. We'll replace +// this with a fixed-size ring buffer if this becomes an issue. +const size_t MPSC_MSG_RESERVERD = sizeof(void*); + +template +class MPSCQueue { + public: + struct Message { + Message() { mNext.store(nullptr, std::memory_order_relaxed); } + Message(const Message& aMessage) = delete; + void operator=(const Message& aMessage) = delete; + + T data; + std::atomic mNext; + }; + + // The goal here is to make it easy on the allocator. We pack a pointer in the + // message struct, and we still want to do power of two allocations to + // minimize allocator slop. The allocation size are going to be constant, so + // the allocation is probably going to hit the thread local cache in jemalloc, + // making it cheap and, more importantly, lock-free enough. This has been + // measured to be cheap and reliable enough, but will be replaced in the + // longer run. + static_assert(IsPowerOfTwo(sizeof(MPSCQueue::Message)), + "MPSCQueue internal allocations must have a size that is a" + "power of two "); + + // Creates a new MPSCQueue. Initially, the queue has a single sentinel node, + // pointed to by both mHead and mTail. + MPSCQueue() + // At construction, the initial message points to nullptr (it has no + // successor). It is a sentinel node, that does not contain meaningful + // data. + : mHead(new Message()), mTail(mHead.load(std::memory_order_relaxed)) {} + + ~MPSCQueue() { + Message dummy; + while (Pop(&dummy.data)) { + } + Message* front = mHead.load(std::memory_order_relaxed); + delete front; + } + + void Push(MPSCQueue::Message* aMessage) { + // The next two non-commented line are called A and B in this paragraph. + // Producer threads i, i-1, etc. are numbered in the order they reached + // A in time, thread i being the thread that has reached A first. + // Atomically, on line A the new `mHead` is set to be the node that was + // just allocated, with strong memory order. From now on, any thread + // that reaches A will see that the node just allocated is + // effectively the head of the list, and will make itself the new head + // of the list. + // In a bad case (when thread i executes A and then + // is not scheduled for a long time), it is possible that thread i-1 and + // subsequent threads create a seemingly disconnected set of nodes, but + // they all have the correct value for the next node to set as their + // mNext member on their respective stacks (in `prev`), and this is + // always correct. When the scheduler resumes, and line B is executed, + // the correct linkage is resumed. + // Before line B, since mNext for the node was the last element of + // the queue still has an mNext of nullptr, Pop will not see the node + // added. + // For line A, it's critical to have strong ordering both ways (since + // it's going to possibly be read and write repeatidly by multiple + // threads) + // Line B can have weaker guarantees, it's only going to be written by a + // single thread, and we just need to ensure it's read properly by a + // single other one. + Message* prev = mHead.exchange(aMessage, std::memory_order_acq_rel); + prev->mNext.store(aMessage, std::memory_order_release); + } + + // Copy the content of the first message of the queue to aOutput, and + // frees the message. Returns true if there was a message, in which case + // `aOutput` contains a valid value. If the queue was empty, returns false, + // in which case `aOutput` is left untouched. + bool Pop(T* aOutput) { + // Similarly, in this paragraph, the two following lines are called A + // and B, and threads are called thread i, i-1, etc. in order of + // execution of line A. + // On line A, the first element of the queue is acquired. It is simply a + // sentinel node. + // On line B, we acquire the node that has the data we want. If B is + // null, then only the sentinel node was present in the queue, we can + // safely return false. + // mTail can be loaded with relaxed ordering, since it's not written nor + // read by any other thread (this queue is single consumer). + // mNext can be written to by one of the producer, so it's necessary to + // ensure those writes are seen, hence the stricter ordering. + Message* tail = mTail.load(std::memory_order_relaxed); + Message* next = tail->mNext.load(std::memory_order_acquire); + + if (next == nullptr) { + return false; + } + + *aOutput = next->data; + + // Simply shift the queue one node further, so that the sentinel node is + // now pointing to the correct most ancient node. It contains stale data, + // but this data will never be read again. + // It's only necessary to ensure the previous load on this thread is not + // reordered past this line, so release ordering is sufficient here. + mTail.store(next, std::memory_order_release); + + // This thread is now the only thing that points to `tail`, it can be + // safely deleted. + delete tail; + + return true; + } + + private: + // An atomic pointer to the most recent message in the queue. + std::atomic mHead; + // An atomic pointer to a sentinel node, that points to the oldest message + // in the queue. + std::atomic mTail; + + MPSCQueue(const MPSCQueue&) = delete; + void operator=(const MPSCQueue&) = delete; +}; + +} // namespace mozilla + +#endif // mozilla_dom_MPSCQueue_h diff --git a/dom/media/moz.build b/dom/media/moz.build index 8acbe9866413..901e33868143 100644 --- a/dom/media/moz.build +++ b/dom/media/moz.build @@ -169,6 +169,7 @@ EXPORTS += [ 'MediaTrackList.h', 'MediaTrackListener.h', 'MemoryBlockCache.h', + 'MPSCQueue.h', 'nsIDocumentActivity.h', 'PrincipalChangeObserver.h', 'PrincipalHandle.h',