Added some benchmark utilities to allow comparison with other implementations.

Adds benchmarks for communicating a single 64-bit integer between threads in
the following thread topologies:
- unicast (1->1)
- multicast (1->3)
- pipeline (1->1->1->1)
- diamond (1->2->1)
- sequencer (3->1)
This commit is contained in:
Lewis Baker 2014-07-30 22:23:09 +09:30
parent afc2468c9f
commit 1dafeae1a4
8 changed files with 678 additions and 0 deletions

20
benchmark/build.cake Normal file
View File

@ -0,0 +1,20 @@
from cake.tools import script, env, compiler
compiler.addIncludePath(env.expand("${DISRUPTOR}/include"))
benchmarks = ["unicast",
"multicast",
"sequencer",
"pipeline",
"diamond",
]
programs = []
for benchmark in benchmarks:
programs.append(
compiler.program(
target=env.expand("${DISRUPTOR_BUILD}/" + benchmark),
sources=compiler.objects(
targetDir=env.expand("${DISRUPTOR_BUILD}"),
sources=script.cwd([benchmark + ".cpp"]),
)))

159
benchmark/diamond.cpp Normal file
View File

@ -0,0 +1,159 @@
#include <disruptorplus/single_threaded_claim_strategy.hpp>
#include <disruptorplus/multi_threaded_claim_strategy.hpp>
#include <disruptorplus/blocking_wait_strategy.hpp>
#include <disruptorplus/spin_wait_strategy.hpp>
#include <disruptorplus/sequence_barrier_group.hpp>
#include <disruptorplus/ring_buffer.hpp>
#include <thread>
#include <chrono>
#include <cstdint>
#include <iostream>
namespace
{
template<typename WaitStrategy, template<typename T> class ClaimStrategy>
uint64_t CalculateOpsPerSecond(size_t bufferSize, uint64_t iterationCount, int consumerCount)
{
WaitStrategy waitStrategy;
std::vector<std::unique_ptr<disruptorplus::sequence_barrier<WaitStrategy>>> consumedBarriers(consumerCount);
ClaimStrategy<WaitStrategy> claimStrategy(bufferSize, waitStrategy);
disruptorplus::sequence_barrier_group<WaitStrategy> parallelConsumers(waitStrategy);
disruptorplus::ring_buffer<uint64_t> buffer(bufferSize);
for (int i = 0; i < consumerCount; ++i)
{
consumedBarriers[i].reset(new disruptorplus::sequence_barrier<WaitStrategy>(waitStrategy));
if (i + 1 != consumerCount)
{
parallelConsumers.add(*consumedBarriers[i]);
}
}
claimStrategy.add_claim_barrier(*consumedBarriers.back());
const uint64_t expectedResult = (iterationCount * (iterationCount - 1)) / 2;
std::vector<uint64_t> results(consumerCount);
std::vector<std::thread> consumers;
consumers.reserve(consumerCount);
// Consumers
for (int consumerIndex = 0; consumerIndex < consumerCount; ++consumerIndex)
{
if (consumerIndex + 1 != consumerCount)
{
consumers.emplace_back([&, consumerIndex]()
{
uint64_t sum = 0;
disruptorplus::sequence_t nextToRead = 0;
uint64_t itemsRemaining = iterationCount;
auto& barrier = *consumedBarriers[consumerIndex];
while (itemsRemaining > 0)
{
const auto available = claimStrategy.wait_until_published(nextToRead, nextToRead - 1);
do
{
sum += buffer[nextToRead];
--itemsRemaining;
} while (nextToRead++ != available);
barrier.publish(available);
}
results[consumerIndex] = sum;
});
}
else
{
consumers.emplace_back([&, consumerIndex]()
{
uint64_t sum = 0;
disruptorplus::sequence_t nextToRead = 0;
uint64_t itemsRemaining = iterationCount;
auto& barrier = *consumedBarriers[consumerIndex];
while (itemsRemaining > 0)
{
const auto available = parallelConsumers.wait_until_published(nextToRead);
do
{
sum += buffer[nextToRead];
--itemsRemaining;
} while (nextToRead++ != available);
barrier.publish(available);
}
results[consumerIndex] = sum;
});
}
}
const auto start = std::chrono::high_resolution_clock::now();
// Publisher
for (uint64_t i = 0; i < iterationCount; ++i)
{
const auto seq = claimStrategy.claim_one();
buffer[seq] = i;
claimStrategy.publish(seq);
}
bool resultsOk = true;
for (int i = 0; i < consumerCount; ++i)
{
consumers[i].join();
if (results[i] != expectedResult)
{
resultsOk = false;
}
}
if (!resultsOk)
{
throw std::domain_error("Unexpected test result.");
}
const auto timeTaken = std::chrono::high_resolution_clock::now() - start;
const auto timeTakenUS = std::chrono::duration_cast<std::chrono::microseconds>(timeTaken).count();
return (iterationCount * 1000 * 1000) / timeTakenUS;
}
}
int main()
{
const int consumerCount = 3;
const size_t bufferSize = 64 * 1024;
const uint64_t iterationCount = 10 * 1000 * 1000;
const uint32_t runCount = 5;
std::cout << "Diamond Throughput Benchmark" << std::endl
<< "Consumer count: " << consumerCount << std::endl
<< "Buffer size: " << bufferSize << std::endl
<< "Iteration count: " << iterationCount << std::endl
<< "Run count: " << runCount << std::endl;
try
{
#define BENCHMARK(CS,WS) \
do { \
std::cout << #CS "/" #WS << std::endl; \
for (uint32_t run = 1; run <= runCount; ++run) \
{ \
const auto opsPerSecond = CalculateOpsPerSecond<disruptorplus::WS, disruptorplus::CS>(bufferSize, iterationCount, consumerCount); \
std::cout << "run " << run << " " << opsPerSecond << " ops/sec" << std::endl; \
} \
} while (false)
BENCHMARK(single_threaded_claim_strategy, spin_wait_strategy);
BENCHMARK(single_threaded_claim_strategy, blocking_wait_strategy);
BENCHMARK(multi_threaded_claim_strategy, spin_wait_strategy);
BENCHMARK(multi_threaded_claim_strategy, blocking_wait_strategy);
}
catch (std::exception& e)
{
std::cout << "error: " << e.what() << std::endl;
return 1;
}
return 0;
}

128
benchmark/multicast.cpp Normal file
View File

@ -0,0 +1,128 @@
#include <disruptorplus/single_threaded_claim_strategy.hpp>
#include <disruptorplus/multi_threaded_claim_strategy.hpp>
#include <disruptorplus/blocking_wait_strategy.hpp>
#include <disruptorplus/spin_wait_strategy.hpp>
#include <disruptorplus/ring_buffer.hpp>
#include <thread>
#include <chrono>
#include <cstdint>
#include <iostream>
namespace
{
template<typename WaitStrategy, template<typename T> class ClaimStrategy>
uint64_t CalculateOpsPerSecond(size_t bufferSize, uint64_t iterationCount, int consumerCount)
{
WaitStrategy waitStrategy;
std::vector<std::unique_ptr<disruptorplus::sequence_barrier<WaitStrategy>>> consumedBarriers(consumerCount);
ClaimStrategy<WaitStrategy> claimStrategy(bufferSize, waitStrategy);
disruptorplus::ring_buffer<uint64_t> buffer(bufferSize);
for (int i = 0; i < consumerCount; ++i)
{
consumedBarriers[i].reset(new disruptorplus::sequence_barrier<WaitStrategy>(waitStrategy));
claimStrategy.add_claim_barrier(*consumedBarriers[i]);
}
const uint64_t expectedResult = (iterationCount * (iterationCount - 1)) / 2;
std::vector<uint64_t> results(consumerCount);
std::vector<std::thread> consumers;
consumers.reserve(consumerCount);
// Consumers
for (int consumerIndex = 0; consumerIndex < consumerCount; ++consumerIndex)
{
consumers.emplace_back([&,consumerIndex]()
{
uint64_t sum = 0;
disruptorplus::sequence_t nextToRead = 0;
uint64_t itemsRemaining = iterationCount;
auto& barrier = *consumedBarriers[consumerIndex];
while (itemsRemaining > 0)
{
const auto available = claimStrategy.wait_until_published(nextToRead, nextToRead - 1);
do
{
sum += buffer[nextToRead];
--itemsRemaining;
} while (nextToRead++ != available);
barrier.publish(available);
}
results[consumerIndex] = sum;
});
}
const auto start = std::chrono::high_resolution_clock::now();
// Publisher
for (uint64_t i = 0; i < iterationCount; ++i)
{
const auto seq = claimStrategy.claim_one();
buffer[seq] = i;
claimStrategy.publish(seq);
}
bool resultsOk = true;
for (int i = 0; i < consumerCount; ++i)
{
consumers[i].join();
if (results[i] != expectedResult)
{
resultsOk = false;
}
}
if (!resultsOk)
{
throw std::domain_error("Unexpected test result.");
}
const auto timeTaken = std::chrono::high_resolution_clock::now() - start;
const auto timeTakenUS = std::chrono::duration_cast<std::chrono::microseconds>(timeTaken).count();
return (iterationCount * 1000 * 1000) / timeTakenUS;
}
}
int main()
{
const int consumerCount = 3;
const size_t bufferSize = 64 * 1024;
const uint64_t iterationCount = 10 * 1000 * 1000;
const uint32_t runCount = 5;
std::cout << "Multicast Throughput Benchmark" << std::endl
<< "Consumer count: " << consumerCount << std::endl
<< "Buffer size: " << bufferSize << std::endl
<< "Iteration count: " << iterationCount << std::endl
<< "Run count: " << runCount << std::endl;
try
{
#define BENCHMARK(CS,WS) \
do { \
std::cout << #CS "/" #WS << std::endl; \
for (uint32_t run = 1; run <= runCount; ++run) \
{ \
const auto opsPerSecond = CalculateOpsPerSecond<disruptorplus::WS, disruptorplus::CS>(bufferSize, iterationCount, consumerCount); \
std::cout << "run " << run << " " << opsPerSecond << " ops/sec" << std::endl; \
} \
} while (false)
BENCHMARK(single_threaded_claim_strategy, spin_wait_strategy);
BENCHMARK(single_threaded_claim_strategy, blocking_wait_strategy);
BENCHMARK(multi_threaded_claim_strategy, spin_wait_strategy);
BENCHMARK(multi_threaded_claim_strategy, blocking_wait_strategy);
}
catch (std::exception& e)
{
std::cout << "error: " << e.what() << std::endl;
return 1;
}
return 0;
}

154
benchmark/pipeline.cpp Normal file
View File

@ -0,0 +1,154 @@
#include <disruptorplus/single_threaded_claim_strategy.hpp>
#include <disruptorplus/multi_threaded_claim_strategy.hpp>
#include <disruptorplus/blocking_wait_strategy.hpp>
#include <disruptorplus/spin_wait_strategy.hpp>
#include <disruptorplus/ring_buffer.hpp>
#include <thread>
#include <chrono>
#include <cstdint>
#include <iostream>
namespace
{
template<typename WaitStrategy, template<typename T> class ClaimStrategy>
uint64_t CalculateOpsPerSecond(size_t bufferSize, uint64_t iterationCount, int consumerCount)
{
WaitStrategy waitStrategy;
std::vector<std::unique_ptr<disruptorplus::sequence_barrier<WaitStrategy>>> consumedBarriers(consumerCount);
ClaimStrategy<WaitStrategy> claimStrategy(bufferSize, waitStrategy);
disruptorplus::ring_buffer<uint64_t> buffer(bufferSize);
for (int i = 0; i < consumerCount; ++i)
{
consumedBarriers[i].reset(new disruptorplus::sequence_barrier<WaitStrategy>(waitStrategy));
}
claimStrategy.add_claim_barrier(*consumedBarriers.back());
const uint64_t expectedResult = (iterationCount * (iterationCount - 1)) / 2;
std::vector<uint64_t> results(consumerCount);
std::vector<std::thread> consumers;
consumers.reserve(consumerCount);
// Consumers
for (int consumerIndex = 0; consumerIndex < consumerCount; ++consumerIndex)
{
if (consumerIndex == 0)
{
consumers.emplace_back([&,consumerIndex]()
{
uint64_t sum = 0;
disruptorplus::sequence_t nextToRead = 0;
uint64_t itemsRemaining = iterationCount;
auto& barrier = *consumedBarriers[consumerIndex];
while (itemsRemaining > 0)
{
const auto available = claimStrategy.wait_until_published(nextToRead, nextToRead - 1);
do
{
sum += buffer[nextToRead];
--itemsRemaining;
} while (nextToRead++ != available);
barrier.publish(available);
}
results[consumerIndex] = sum;
});
}
else
{
consumers.emplace_back([&,consumerIndex]()
{
uint64_t sum = 0;
disruptorplus::sequence_t nextToRead = 0;
uint64_t itemsRemaining = iterationCount;
auto& barrier = *consumedBarriers[consumerIndex];
auto& prevBarrier = *consumedBarriers[consumerIndex - 1];
while (itemsRemaining > 0)
{
const auto available = prevBarrier.wait_until_published(nextToRead);
do
{
sum += buffer[nextToRead];
--itemsRemaining;
} while (nextToRead++ != available);
barrier.publish(available);
}
results[consumerIndex] = sum;
});
}
}
const auto start = std::chrono::high_resolution_clock::now();
// Publisher
for (uint64_t i = 0; i < iterationCount; ++i)
{
const auto seq = claimStrategy.claim_one();
buffer[seq] = i;
claimStrategy.publish(seq);
}
bool resultsOk = true;
for (int i = 0; i < consumerCount; ++i)
{
consumers[i].join();
if (results[i] != expectedResult)
{
resultsOk = false;
}
}
if (!resultsOk)
{
throw std::domain_error("Unexpected test result.");
}
const auto timeTaken = std::chrono::high_resolution_clock::now() - start;
const auto timeTakenUS = std::chrono::duration_cast<std::chrono::microseconds>(timeTaken).count();
return (iterationCount * 1000 * 1000) / timeTakenUS;
}
}
int main()
{
const int consumerCount = 3;
const size_t bufferSize = 64 * 1024;
const uint64_t iterationCount = 10 * 1000 * 1000;
const uint32_t runCount = 5;
std::cout << "Pipeline Throughput Benchmark" << std::endl
<< "Consumer count: " << consumerCount << std::endl
<< "Buffer size: " << bufferSize << std::endl
<< "Iteration count: " << iterationCount << std::endl
<< "Run count: " << runCount << std::endl;
try
{
#define BENCHMARK(CS,WS) \
do { \
std::cout << #CS "/" #WS << std::endl; \
for (uint32_t run = 1; run <= runCount; ++run) \
{ \
const auto opsPerSecond = CalculateOpsPerSecond<disruptorplus::WS, disruptorplus::CS>(bufferSize, iterationCount, consumerCount); \
std::cout << "run " << run << " " << opsPerSecond << " ops/sec" << std::endl; \
} \
} while (false)
BENCHMARK(single_threaded_claim_strategy, spin_wait_strategy);
BENCHMARK(single_threaded_claim_strategy, blocking_wait_strategy);
BENCHMARK(multi_threaded_claim_strategy, spin_wait_strategy);
BENCHMARK(multi_threaded_claim_strategy, blocking_wait_strategy);
}
catch (std::exception& e)
{
std::cout << "error: " << e.what() << std::endl;
return 1;
}
return 0;
}

111
benchmark/sequencer.cpp Normal file
View File

@ -0,0 +1,111 @@
#include <disruptorplus/single_threaded_claim_strategy.hpp>
#include <disruptorplus/multi_threaded_claim_strategy.hpp>
#include <disruptorplus/blocking_wait_strategy.hpp>
#include <disruptorplus/spin_wait_strategy.hpp>
#include <disruptorplus/ring_buffer.hpp>
#include <thread>
#include <chrono>
#include <cstdint>
#include <iostream>
namespace
{
template<typename WaitStrategy, template<typename T> class ClaimStrategy>
uint64_t CalculateOpsPerSecond(size_t bufferSize, uint64_t iterationCount, int producerCount)
{
WaitStrategy waitStrategy;
ClaimStrategy<WaitStrategy> claimStrategy(bufferSize, waitStrategy);
disruptorplus::sequence_barrier<WaitStrategy> consumed(waitStrategy);
claimStrategy.add_claim_barrier(consumed);
disruptorplus::ring_buffer<uint64_t> buffer(bufferSize);
const uint64_t expectedResult = producerCount * (iterationCount * (iterationCount - 1)) / 2;
std::vector<std::thread> producers;
producers.reserve(producerCount);
const auto start = std::chrono::high_resolution_clock::now();
// Producers
for (int producerIndex = 0; producerIndex < producerCount; ++producerIndex)
{
producers.emplace_back([&]()
{
for (uint64_t i = 0; i < iterationCount; ++i)
{
const auto seq = claimStrategy.claim_one();
buffer[seq] = i;
claimStrategy.publish(seq);
}
});
}
// Consumer
uint64_t sum = 0;
disruptorplus::sequence_t nextToRead = 0;
uint64_t itemsRemaining = iterationCount * producerCount;
while (itemsRemaining > 0)
{
const auto available = claimStrategy.wait_until_published(nextToRead, nextToRead - 1);
do
{
sum += buffer[nextToRead];
--itemsRemaining;
} while (nextToRead++ != available);
consumed.publish(available);
}
for (int i = 0; i < producerCount; ++i)
{
producers[i].join();
}
if (sum != expectedResult)
{
throw std::domain_error("Unexpected test result.");
}
const auto timeTaken = std::chrono::high_resolution_clock::now() - start;
const auto timeTakenUS = std::chrono::duration_cast<std::chrono::microseconds>(timeTaken).count();
return (producerCount * iterationCount * 1000 * 1000) / timeTakenUS;
}
}
int main()
{
const int producerCount = 3;
const size_t bufferSize = 64 * 1024;
const uint64_t iterationCount = 10 * 1000 * 1000;
const uint32_t runCount = 5;
std::cout << "Multicast Throughput Benchmark" << std::endl
<< "Producer count: " << producerCount << std::endl
<< "Buffer size: " << bufferSize << std::endl
<< "Iteration count: " << iterationCount << std::endl
<< "Run count: " << runCount << std::endl;
try
{
#define BENCHMARK(CS,WS) \
do { \
std::cout << #CS "/" #WS << std::endl; \
for (uint32_t run = 1; run <= runCount; ++run) \
{ \
const auto opsPerSecond = CalculateOpsPerSecond<disruptorplus::WS, disruptorplus::CS>(bufferSize, iterationCount, producerCount); \
std::cout << "run " << run << " " << opsPerSecond << " ops/sec" << std::endl; \
} \
} while (false)
BENCHMARK(multi_threaded_claim_strategy, spin_wait_strategy);
BENCHMARK(multi_threaded_claim_strategy, blocking_wait_strategy);
}
catch (std::exception& e)
{
std::cout << "error: " << e.what() << std::endl;
return 1;
}
return 0;
}

104
benchmark/unicast.cpp Normal file
View File

@ -0,0 +1,104 @@
#include <disruptorplus/single_threaded_claim_strategy.hpp>
#include <disruptorplus/multi_threaded_claim_strategy.hpp>
#include <disruptorplus/blocking_wait_strategy.hpp>
#include <disruptorplus/spin_wait_strategy.hpp>
#include <disruptorplus/ring_buffer.hpp>
#include <thread>
#include <chrono>
#include <cstdint>
#include <iostream>
namespace
{
template<typename WaitStrategy, template<typename T> class ClaimStrategy>
uint64_t CalculateOpsPerSecond(size_t bufferSize, uint64_t iterationCount)
{
WaitStrategy waitStrategy;
ClaimStrategy<WaitStrategy> claimStrategy(bufferSize, waitStrategy);
disruptorplus::sequence_barrier<WaitStrategy> consumed(waitStrategy);
claimStrategy.add_claim_barrier(consumed);
disruptorplus::ring_buffer<uint64_t> buffer(bufferSize);
const uint64_t expectedResult = (iterationCount * (iterationCount - 1)) / 2;
uint64_t result;
std::thread consumer([&]()
{
uint64_t sum = 0;
disruptorplus::sequence_t nextToRead = 0;
uint64_t itemsRemaining = iterationCount;
while (itemsRemaining > 0)
{
const auto available = claimStrategy.wait_until_published(nextToRead, nextToRead - 1);
do
{
sum += buffer[nextToRead];
--itemsRemaining;
} while (nextToRead++ != available);
consumed.publish(available);
}
result = sum;
});
const auto start = std::chrono::high_resolution_clock::now();
// Publisher
for (uint64_t i = 0; i < iterationCount; ++i)
{
const auto seq = claimStrategy.claim_one();
buffer[seq] = i;
claimStrategy.publish(seq);
}
consumer.join();
if (result != expectedResult)
{
throw std::domain_error("Unexpected test result.");
}
const auto timeTaken = std::chrono::high_resolution_clock::now() - start;
const auto timeTakenUS = std::chrono::duration_cast<std::chrono::microseconds>(timeTaken).count();
return (iterationCount * 1000 * 1000) / timeTakenUS;
}
}
int main()
{
const size_t bufferSize = 64 * 1024;
const uint64_t iterationCount = 10 * 1000 * 1000;
const uint32_t runCount = 5;
std::cout << "Unicast Throughput Benchmark" << std::endl
<< "Buffer size: " << bufferSize << std::endl
<< "Iteration count: " << iterationCount << std::endl
<< "Run count: " << runCount << std::endl;
try
{
#define BENCHMARK(CS,WS) \
do { \
std::cout << #CS "/" #WS << std::endl; \
for (uint32_t run = 1; run <= runCount; ++run) \
{ \
const auto opsPerSecond = CalculateOpsPerSecond<disruptorplus::WS, disruptorplus::CS>(bufferSize, iterationCount); \
std::cout << "run " << run << " " << opsPerSecond << " ops/sec" << std::endl; \
} \
} while (false)
BENCHMARK(single_threaded_claim_strategy, spin_wait_strategy);
BENCHMARK(single_threaded_claim_strategy, blocking_wait_strategy);
BENCHMARK(multi_threaded_claim_strategy, spin_wait_strategy);
BENCHMARK(multi_threaded_claim_strategy, blocking_wait_strategy);
}
catch (std::exception& e)
{
std::cout << "error: " << e.what() << std::endl;
return 1;
}
return 0;
}

View File

@ -6,5 +6,6 @@
from cake.tools import script
script.execute(script.cwd([
"benchmark/build.cake",
"test/build.cake",
]))

View File

@ -111,6 +111,7 @@ if cake.system.isWindows() or cake.system.isCygwin():
architecture=arch,
version=msvcVer,
)
compiler.addCppFlag("/Zc:forScope")
compiler.addDefine("WIN32")
compiler.addDefine("_WIN32_WINNT", "0x0500") # WinXP
compiler.addDefine("NOMINMAX")