Initial version of disruptorplus library.

Library is header-only library at this stage.
Defines core sequence_barrier and sequence_barrier_group classes.
Includes a blocking_wait_strategy that uses a mutex/condition_variable.
Includes a single_threaded_claim_strategy for single-threaded producers.
This commit is contained in:
Lewis Baker 2013-10-08 21:34:55 +10:30
commit af12980e35
14 changed files with 946 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
*.cakec
build

13
args.cake Normal file
View File

@ -0,0 +1,13 @@
from cake.script import Script
parser = Script.getCurrent().engine.parser
# Add a project generation option. It will be stored in 'engine.options' which
# can later be accessed in our config.cake.
parser.add_option(
"-p", "--projects",
action="store_true",
dest="createProjects",
help="Create projects instead of building a variant.",
default=False,
)

10
build.cake Normal file
View File

@ -0,0 +1,10 @@
###############################################################################
# C++ Disruptor Library
# Copyright (c) 2013 Lewis Baker
###############################################################################
from cake.tools import script
script.execute(script.cwd([
"test/build.cake",
]))

130
config.cake Normal file
View File

@ -0,0 +1,130 @@
###############################################################################
# DisruptorPlus - C++ Library implementing a Disruptor data-structure
# Copyright (c) 2013 Lewis Baker
###############################################################################
import sys
import cake.path
import cake.system
from cake.engine import Variant
from cake.script import Script
configuration = Script.getCurrent().configuration
engine = configuration.engine
hostPlatform = cake.system.platform().lower()
hostArchitecture = cake.system.architecture().lower()
from cake.library.script import ScriptTool
from cake.library.variant import VariantTool
from cake.library.env import EnvironmentTool
from cake.library.project import ProjectTool
from cake.library.compilers import CompilerNotFoundError
from cake.library.compilers.msvc import findMsvcCompiler
# Setup all of the parameters common to all variants
baseVariant = Variant()
baseVariant.tools["script"] = script = ScriptTool(configuration=configuration)
baseVariant.tools["variant"] = variant = VariantTool(configuration=configuration)
baseVariant.tools["env"] = env = EnvironmentTool(configuration=configuration)
env["DISRUPTOR"] = "."
env["VARIANT"] = "${PLATFORM}_${ARCHITECTURE}_${COMPILER}${COMPILER_VERSION}_${RELEASE}"
env["BUILD"] = "build/${VARIANT}"
env["DISRUPTOR_BUILD"] = "${BUILD}"
env["DISRUPTOR_PROJECT"] = "build/project"
env["DISRUPTOR_BIN"] = "${DISRUPTOR_BUILD}/bin"
env["DISRUPTOR_LIB"] = "${DISRUPTOR_BUILD}/lib"
baseVariant.tools["project"] = project = ProjectTool(configuration=configuration)
project.product = project.VS2010
project.enabled = engine.options.createProjects
if project.enabled:
engine.addBuildSuccessCallback(project.build)
def createVariants(baseVariant):
for release in ["debug", "optimised"]:
variant = baseVariant.clone(release=release)
platform = variant.keywords["platform"]
compilerName = variant.keywords["compiler"]
architecture = variant.keywords["architecture"]
env = variant.tools["env"]
env["RELEASE"] = release
env["BUILD"] = env.expand("${BUILD}")
compiler = variant.tools["compiler"]
compiler.enableRtti = True
compiler.enableExceptions = True
compiler.outputMapFile = True
compiler.messageStyle = compiler.MSVS_CLICKABLE
if release == "debug":
compiler.addDefine("_DEBUG")
compiler.debugSymbols = True
compiler.useIncrementalLinking = True
compiler.optimisation = compiler.NO_OPTIMISATION
compiler.runtimeLibraries = 'debug-dll'
elif release == "optimised":
compiler.addDefine("NDEBUG")
compiler.debugSymbols = True
compiler.useIncrementalLinking = False
compiler.useFunctionLevelLinking = True
compiler.optimisation = compiler.FULL_OPTIMISATION
compiler.runtimeLibraries = 'release-dll'
compiler.addCppFlag('/Oxs')
# Disable the compiler during project generation
if engine.options.createProjects:
compiler.enabled = False
projectTool = variant.tools["project"]
projectTool.projectConfigName = "%s (%s) %s (%s)" % (
platform.capitalize(),
architecture,
release.capitalize(),
compilerName,
)
projectTool.projectPlatformName = "Win32"
projectTool.solutionConfigName = release.capitalize()
projectTool.solutionPlatformName = "%s %s (%s)" % (
platform.capitalize(),
compilerName.capitalize(),
architecture,
)
configuration.addVariant(variant)
if cake.system.isWindows() or cake.system.isCygwin():
for msvcVer in ("11.0",):
for arch in ("x86", "x64"):
try:
msvcVariant = baseVariant.clone(
platform="windows",
compiler="msvc" + msvcVer,
compilerFamily="msvc",
architecture=arch,
)
msvcVariant.tools["compiler"] = compiler = findMsvcCompiler(
configuration=configuration,
architecture=arch,
version=msvcVer,
)
compiler.addDefine("WIN32")
compiler.addDefine("_WIN32_WINNT", "0x0500") # WinXP
if arch in ("x64", "ia64"):
compiler.addDefine("WIN64")
env = msvcVariant.tools["env"]
env["COMPILER"] = "msvc"
env["COMPILER_VERSION"] = msvcVer
env["PLATFORM"] = "windows"
env["ARCHITECTURE"] = arch
createVariants(msvcVariant)
except CompilerNotFoundError:
pass
else:
configuration.engine.raiseError("Unsupported platform: %s\n" % cake.system.platform())

View File

@ -0,0 +1,115 @@
#ifndef DISRUPTORPLUS_BLOCKING_WAIT_STRATEGY_HPP_INCLUDED
#define DISRUPTORPLUS_BLOCKING_WAIT_STRATEGY_HPP_INCLUDED
#include <disruptorplus/sequence.hpp>
#include <atomic>
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <mutex>
namespace disruptorplus
{
// A wait_strategy that blocks waiting threads until the respective
// sequence barriers are have reached the desired sequence numbers.
//
// All currently blocked threads will be woken when any sequence
// barrier publishes a new sequence regardless of whether those
// threads are currently waiting on that sequence barrier or not.
class blocking_wait_strategy
{
public:
blocking_wait_strategy()
{}
// Wait unconditionally until all of the specified sequences
// have at least published the specified sequence value.
// Returns the value of the least-advanced sequence.
sequence_t wait_until_published(
sequence_t sequence,
size_t count,
const std::atomic<sequence_t>* const sequences[])
{
assert(count > 0);
sequence_t result;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.wait(lock, [&]() -> bool {
result = minimum_sequence_after(sequence, count, sequences);
return difference(result, sequence) >= 0;
});
}
return result;
}
// Wait until all of the specified sequences have at least
// published the specified sequence value.
// Timeout if waited longer than specified duration.
// Returns the highest sequence that all sequences have
// published if did not time out.
// If timed out then returns some number such that
// difference(result, sequence) < 0.
template<typename Rep, typename Period>
sequence_t wait_until_published(
sequence_t sequence,
size_t count,
const std::atomic<sequence_t>* const sequences[],
const std::chrono::duration<Rep, Period>& timeout)
{
assert(count > 0);
sequence_t result;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.wait_for(
lock,
[&]() -> bool {
result = minimum_sequence_after(sequence, count, sequences);
return difference(result, sequence) >= 0;
},
timeout);
}
return result;
}
template<typename Clock, typename Duration>
sequence_range wait_until_published(
sequence_t sequence,
size_t count,
const std::atomic<sequence_t>* const sequences[],
const std::chrono::time_point<Clock, Duration>& timeoutTime)
{
assert(count > 0);
sequence_t result;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.wait_until(
lock,
[&]() -> bool {
result = minimum_sequence_after(sequence, count, sequences);
return difference(result, sequence) >= 0;
},
timeoutTime);
}
return result;
}
void signal_all_when_blocking()
{
// Take out a lock here because we don't want to notify other threads
// if they are between checking the sequence values and waiting on
// the condition-variable.
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.notify_all();
}
private:
std::mutex m_mutex;
std::condition_variable m_cv;
};
}
#endif

View File

@ -0,0 +1,12 @@
#ifndef DISRUPTORPLUS_CONFIG_HPP_INCLUDED
#define DISRUPTORPLUS_CONFIG_HPP_INCLUDED
#include <cstddef>
namespace disruptorplus
{
// Suitable for modern x86 architectures.
const size_t CacheLineSize = 64;
}
#endif

View File

@ -0,0 +1,54 @@
#ifndef DISRUPTORPLUS_RING_BUFFER_HPP_INCLUDED
#define DISRUPTORPLUS_RING_BUFFER_HPP_INCLUDED
#include <disruptorplus/sequence.hpp>
#include <memory>
#include <cassert>
namespace disruptorplus
{
template<typename T>
class ring_buffer
{
public:
typedef T value_type;
typedef T& reference_type;
ring_buffer(size_t size)
: m_size(size)
, m_mask(size - 1)
, m_data(new T[size])
{
// Check that size was a power-of-two.
assert(m_size > 0 && (m_size & m_mask) == 0);
}
size_t size() const
{
return m_size;
}
T& operator[](sequence_t seq)
{
return m_data[static_cast<size_t>(seq) & m_mask];
}
const T& operator[](sequence_t seq) const
{
return m_data[static_case<size_t>(seq) & m_mask];
}
private:
ring_buffer(const ring_buffer&);
const size_t m_size;
const size_t m_mask;
std::unique_ptr<T[]> m_data;
};
}
#endif

View File

@ -0,0 +1,65 @@
#ifndef DISRUPTORPLUS_SEQUENCE_HPP_INCLUDED
#define DISRUPTORPLUS_SEQUENCE_HPP_INCLUDED
#include <atomic>
#include <cstdint>
#include <cassert>
namespace disruptorplus
{
typedef uint64_t sequence_t;
typedef int64_t sequence_diff_t;
inline sequence_diff_t difference(sequence_t a, sequence_t b)
{
return static_cast<sequence_diff_t>(a - b);
}
// Calculate the minimum sequence number of the array of sequences
// taking into account wrapping by using the first sequence as the zero-point.
// This assumes no active sequence value will be more than
// (1 << (sizeof(sequence_t) * 8 - 2)) - 1 different from each other.
//
// Implies acquire memory semantics on each of the sequences.
inline sequence_t minimum_sequence(
size_t count,
const std::atomic<sequence_t>* const sequences[])
{
assert(count > 0);
sequence_t current = sequences[0]->load(std::memory_order_acquire);
for (size_t i = 1; i < count; ++i)
{
sequence_t seq = sequences[i]->load(std::memory_order_acquire);
if (difference(seq, current) < 0)
{
current = seq;
}
}
return current;
}
// Calculate the minimum sequence number of the array of sequences
// taking into account wrapping by using a current value as the zero-point.
// This assumes no active sequence value will be more than
// (1 << (sizeof(sequence_t) * 8 - 2)) different from current.
//
// Implies acquire memory semantics on each of the sequences.
// If any of the sequences precede 'current' then return its value
// immediately without checking the others.
inline sequence_t minimum_sequence_after(
sequence_t current,
size_t count,
const std::atomic<sequence_t>* const sequences[])
{
assert(count > 0);
sequence_diff_t minDelta = difference(sequences[0]->load(std::memory_order_acquire), current);
for (size_t i = 1; i < count && minDelta >= 0; ++i)
{
sequence_t seq = sequences[i]->load(std::memory_order_acquire);
minDelta = std::min(minDelta, difference(seq, current));
}
return static_cast<sequence_t>(minDelta + current);
}
}
#endif

View File

@ -0,0 +1,92 @@
#ifndef DISRUPTORPLUS_SEQUENCE_BARRIER_HPP_INCLUDED
#define DISRUPTORPLUS_SEQUENCE_BARRIER_HPP_INCLUDED
#include <disruptorplus/config.hpp>
#include <disruptorplus/sequence.hpp>
#include <atomic>
#include <chrono>
namespace disruptorplus
{
template<typename WaitStrategy>
class sequence_barrier_group;
template<typename WaitStrategy>
class sequence_barrier
{
public:
sequence_barrier(WaitStrategy& waitStrategy)
: m_waitStrategy(waitStrategy)
, m_lastPublished(static_cast<sequence_t>(-1))
{}
sequence_t last_published() const
{
return m_lastPublished.load(std::memory_order_acquire);
}
sequence_t wait_until_published(sequence_t sequence) const
{
sequence_t current = last_published();
sequence_diff_t diff = difference(current, sequence);
if (diff >= 0)
{
return current;
}
const std::atomic<sequence_t>* const sequences[] = { &m_lastPublished };
return m_waitStrategy.wait_until_published(sequence, 1, sequences);
}
template<class Rep, class Period>
sequence_t wait_until_published(
sequence_t sequence,
const std::chrono::duration<Rep, Period>& timeout) const
{
sequence_t current = last_published();
sequence_diff_t diff = difference(current, sequence);
if (diff >= 0)
{
return current;
}
const std::atomic<sequence_t>* const sequences[] = { &m_lastPublished };
return m_waitStrategy.wait_until_published(sequence, 1, sequences, timeout);
}
template<class Clock, class Duration>
sequence_t wait_until_published(
sequence_t sequence,
const std::chrono::time_point<Clock, Duration>& timeoutTime) const
{
sequence_t current = last_published();
sequence_diff_t diff = difference(current, sequence);
if (diff >= 0)
{
return current;
}
const std::atomic<sequence_t>* const sequences[] = { &m_lastPublished };
return m_waitStrategy.wait_until_published(sequence, 1, sequences, timeoutTime);
}
void publish(sequence_t sequence)
{
m_lastPublished.store(sequence, std::memory_order_release);
m_waitStrategy.signal_all_when_blocking();
}
private:
friend class sequence_barrier_group<WaitStrategy>;
WaitStrategy& m_waitStrategy;
// Pad before/after to prevent false-sharing
uint8_t m_pad1[CacheLineSize - sizeof(sequence_t)];
std::atomic<sequence_t> m_lastPublished;
uint8_t m_pad2[CacheLineSize - sizeof(sequence_t)];
};
}
#endif

View File

@ -0,0 +1,97 @@
#ifndef DISRUPTORPLUS_SEQUENCE_BARRIER_GROUP_HPP_INCLUDED
#define DISRUPTORPLUS_SEQUENCE_BARRIER_GROUP_HPP_INCLUDED
#include <disruptorplus/sequence.hpp>
#include <disruptorplus/sequence_barrier.hpp>
#include <cassert>
#include <chrono>
#include <vector>
namespace disruptorplus
{
template<typename WaitStrategy>
class sequence_barrier_group
{
public:
sequence_barrier_group(WaitStrategy& waitStrategy)
: m_waitStrategy(waitStrategy)
{}
void add(const sequence_barrier<WaitStrategy>& barrier)
{
assert(&barrier.m_waitStrategy == &m_waitStrategy);
m_sequences.push_back(&barrier.m_lastPublished);
}
void add(const sequence_barrier_group<WaitStrategy>& barrierGroup)
{
m_sequences.insert(
m_sequences.end(),
barrierGroup.m_sequences.begin(),
barrierGroup.m_sequences.end());
}
sequence_t last_published() const
{
assert(!m_sequences.empty());
return minimum_sequence(m_sequences.size(), m_sequences.data());
}
sequence_t wait_until_published(sequence_t sequence) const
{
assert(!m_sequences.empty());
size_t count = m_sequences.size();
sequence_t current = minimum_sequence_after(sequence, count, m_sequences.data());
if (difference(current, sequence) >= 0)
{
return current;
}
return m_waitStrategy.wait_until_published(sequence, count, m_sequences.data());
}
template<class Rep, class Period>
sequence_t wait_until_published(
sequence_t sequence,
const std::chrono::duration<Rep, Period>& timeout) const
{
assert(!m_sequences.empty());
sequence_t current = minimum_sequence_after(sequence, count, m_sequences.data());
if (difference(current, sequence) >= 0)
{
return current;
}
return m_waitStrategy.wait_until_published(sequence, count, m_sequences.data(), timeout);
}
template<class Clock, class Duration>
sequence_t wait_until_published(
sequence_t sequence,
const std::chrono::time_point<Clock, Duration>& timeoutTime) const
{
assert(!m_sequences.empty());
sequence_t current = minimum_sequence_after(sequence, count, m_sequences.data());
if (difference(current, sequence) >= 0)
{
return current;
}
return m_waitStrategy.wait_until_published(sequence, count, m_sequences.data(), timeoutTime);
}
private:
WaitStrategy& m_waitStrategy;
std::vector<const std::atomic<sequence_t>*> m_sequences;
};
}
#endif

View File

@ -0,0 +1,39 @@
#ifndef DISRUPTORPLUS_SEQUENCE_RANGE_HPP_INCLUDED
#define DISRUPTORPLUS_SEQUENCE_RANGE_HPP_INCLUDED
#include <disruptorplus/sequence.hpp>
namespace disruptorplus
{
class sequence_range
{
public:
// Construct to the empty range.
sequence_range()
: m_first(0)
, m_size(0)
{}
sequence_range(sequence_t first, size_t size)
: m_first(first)
, m_size(size)
{}
size_t size() const { return m_size; }
sequence_t first() const { return m_first; }
sequence_t last() const { return end() - 1; }
sequence_t end() const { return static_cast<sequence_t>(m_first + m_size); }
sequence_t operator[](size_t index) const { return static_cast<sequence_t>(m_first + index); }
private:
sequence_t m_first;
size_t m_size;
};
}
#endif

View File

@ -0,0 +1,196 @@
#ifndef DISRUPTORPLUS_SINGLE_THREADED_CLAIM_STRATEGY_HPP_INCLUDED
#define DISRUPTORPLUS_SINGLE_THREADED_CLAIM_STRATEGY_HPP_INCLUDED
#include <disruptorplus/sequence_range.hpp>
#include <disruptorplus/sequence_barrier.hpp>
#include <disruptorplus/sequence_barrier_group.hpp>
#include <algorithm>
#include <chrono>
#include <cstddef>
namespace disruptorplus
{
template<typename WaitStrategy>
class single_threaded_claim_strategy
{
public:
single_threaded_claim_strategy(
size_t bufferSize,
WaitStrategy& waitStrategy)
: m_bufferSize(bufferSize)
, m_nextSequenceToClaim(0)
, m_lastKnownClaimableSequence(static_cast<sequence_t>(-1))
, m_claimBarrier(waitStrategy)
, m_readBarrier(waitStrategy)
{}
size_t buffer_size() const
{
return m_bufferSize;
}
// The barrier that readers should wait on for published items
const sequence_barrier<WaitStrategy>& read_barrier() const
{
return m_readBarrier;
}
void add_claim_barrier(sequence_barrier<WaitStrategy>& barrier)
{
m_claimBarrier.add(barrier);
m_lastKnownClaimableSequence = m_claimBarrier.last_published() + m_bufferSize;
}
void add_claim_barrier(sequence_barrier_group<WaitStrategy>& barrier)
{
m_claimBarrier.add(barrier);
m_lastKnownClaimableSequence = m_claimBarrier.last_published() + m_bufferSize;
}
sequence_t claim_one()
{
return claim(1).first();
}
sequence_range claim(size_t count)
{
sequence_range result;
if (try_claim(count, result))
{
return result;
}
sequence_t claimable = static_cast<sequence_t>(
m_claimBarrier.wait_until_published(
static_cast<sequence_t>(m_nextSequenceToClaim - m_bufferSize)) +
m_bufferSize);
sequence_diff_t diff = difference(claimable, m_nextSequenceToClaim);
assert(diff >= 0);
size_t available = static_cast<size_t>(diff + 1);
count = std::min(count, available);
result = sequence_range(m_nextSequenceToClaim, count);
m_nextSequenceToClaim += count;
m_lastKnownClaimableSequence = claimable;
return result;
}
bool try_claim(size_t count, sequence_range& range)
{
sequence_diff_t diff = difference(m_lastKnownClaimableSequence, m_nextSequenceToClaim);
if (diff < 0)
{
sequence_t seq = static_cast<sequence_t>(m_claimBarrier.last_published() + m_bufferSize);
diff = difference(seq, m_nextSequenceToClaim);
if (diff < 0)
{
return false;
}
// Only bother updating our cached claimable seq if we will actually be
// claiming something. Otherwise our existing cached value already indicates
// that we need to check again next time already.
m_lastKnownClaimableSequence = seq;
}
assert(diff >= 0);
size_t available = static_cast<size_t>(diff + 1);
count = std::min(count, available);
range = sequence_range(m_nextSequenceToClaim, count);
m_nextSequenceToClaim += count;
return true;
}
template<class Rep, class Period>
bool try_claim_for(
size_t count,
sequence_range& range,
const std::chrono::duration<Rep, Period>& timeout)
{
if (try_claim(count, range))
{
return true;
}
sequence_t claimable =
static_cast<sequence_t>(
m_claimBarrier.wait_until_published(
static_cast<sequence_t>(m_nextSequenceToClaim - m_bufferSize),
timeout) + m_bufferSize);
sequence_diff_t diff = difference(claimable, m_nextSequenceToClaim);
if (diff < 0)
{
// Timeout
return false;
}
size_t available = static_cast<size_t>(diff + 1);
count = std::min(count, available);
range = sequence_range(m_nextSequenceToClaim, count);
m_nextSequenceToClaim += count;
m_lastKnownClaimableSequence = claimable;
return true;
}
template<class Clock, class Duration>
bool try_claim_until(
size_t count,
const std::chrono::time_point<Clock, Duration>& timeoutTime,
sequence_range& range)
{
if (try_claim(count, range))
{
return true;
}
sequence_t claimable =
static_cast<sequence_t>(
m_claimBarrier.wait_until_published(
static_cast<sequence_t>(m_nextSequenceToClaim - m_bufferSize),
timeoutTime) + m_bufferSize);
sequence_diff_t diff = difference(claimable, m_nextSequenceToClaim);
if (diff < 0)
{
// Timeout
return false;
}
size_t available = static_cast<size_t>(diff + 1);
count = std::min(count, available);
range = sequence_range(m_nextSequenceToClaim, count);
m_nextSequenceToClaim += count;
m_lastKnownClaimableSequence = claimable;
return true;
}
void publish(sequence_t sequence)
{
m_readBarrier.publish(sequence);
}
private:
const size_t m_bufferSize;
// The next sequence to be claimed (may not yet be available).
sequence_t m_nextSequenceToClaim;
// A cache of the last-known available sequence value
// queried using m_claimBarrier.
sequence_t m_lastKnownClaimableSequence;
sequence_barrier_group<WaitStrategy> m_claimBarrier;
// Barrier used to publish items to the
sequence_barrier<WaitStrategy> m_readBarrier;
};
}
#endif

14
test/build.cake Normal file
View File

@ -0,0 +1,14 @@
from cake.tools import script, env, compiler
compiler.addIncludePath(env.expand("${DISRUPTOR}/include"))
objects = compiler.objects(
targetDir=env.expand("${DISRUPTOR_BUILD}"),
sources=script.cwd([
"test_1.cpp",
]))
program = compiler.program(
target=env.expand("${DISRUPTOR_BUILD}/test_1"),
sources=objects,
)

107
test/test_1.cpp Normal file
View File

@ -0,0 +1,107 @@
#include <disruptorplus/ring_buffer.hpp>
#include <disruptorplus/single_threaded_claim_strategy.hpp>
#include <disruptorplus/blocking_wait_strategy.hpp>
#include <disruptorplus/sequence_barrier.hpp>
#include <iostream>
#include <thread>
#include <cassert>
using namespace disruptorplus;
struct message
{
uint32_t m_type;
uint8_t m_data[28];
};
int main(int argc, char* argv[])
{
const int itemCount = 10000000;
const size_t bufferSize = size_t(1) << 17;
blocking_wait_strategy waitStrategy;
single_threaded_claim_strategy<blocking_wait_strategy> claimStrategy(bufferSize, waitStrategy);
sequence_barrier<blocking_wait_strategy> finishedReading(waitStrategy);
claimStrategy.add_claim_barrier(finishedReading);
ring_buffer<message> buffer(bufferSize);
uint64_t result;
auto start = std::chrono::high_resolution_clock::now();
std::thread reader([&]() {
const auto& readBarrier = claimStrategy.read_barrier();
bool exit = false;
uint64_t sum = 0;
sequence_t nextToRead = 0;
while (!exit)
{
sequence_t available = readBarrier.wait_until_published(nextToRead);
assert(difference(available, nextToRead) >= 0);
do
{
auto& message = buffer[nextToRead];
if (message.m_type == 0xdead)
{
exit = true;
}
else if (message.m_type == 0xadd)
{
for (int i = 0; i < 28; ++i)
{
sum += message.m_data[i];
}
}
else if (message.m_type == 0xdec)
{
for (int i = 0; i < 28; ++i)
{
sum -= message.m_data[i];
}
}
} while (nextToRead++ != available);
finishedReading.publish(available);
}
result = sum;
});
std::thread writer([&]() {
for (int i = 0; i < itemCount; ++i)
{
sequence_t seq = claimStrategy.claim_one();
auto& item = buffer[seq];
item.m_type = i % 5 == 0 ? 0xadd : 0xdec;
for (int j = 0; j < 28; ++j)
{
item.m_data[j] = (i + j) % 60;
}
if (seq % 256 == 0)
{
claimStrategy.publish(seq);
}
}
sequence_t seq = claimStrategy.claim_one();
auto& item = buffer[seq];
item.m_type = 0xdead;
claimStrategy.publish(seq);
});
reader.join();
writer.join();
auto end = std::chrono::high_resolution_clock::now();
auto dur = (end - start);
auto durMS = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
auto durNS = std::chrono::duration_cast<std::chrono::nanoseconds>(dur);
auto nsPerItem = durNS / itemCount;
std::cout << result << "\n"
<< durMS.count() << "ms total time\n"
<< nsPerItem.count() << "ns per item (avg)\n"
<< std::flush;
return 0;
}