Multithreaded tests are reenabled

This commit is contained in:
Dominic Hamon 2014-01-07 13:14:41 -08:00
parent 15bf66750c
commit 2923a481ed
4 changed files with 137 additions and 107 deletions

View File

@ -129,6 +129,7 @@ static void BM_MultiThreaded(benchmark::State& state) {
// Teardown code here.
}
}
BENCHMARK(BM_MultiThreaded)->Threads(4);
*/
#ifndef BENCHMARK_BENCHMARK_H_
@ -137,6 +138,7 @@ static void BM_MultiThreaded(benchmark::State& state) {
#include <stdint.h>
#include <functional>
#include <memory>
#include <string>
#include <vector>
@ -226,6 +228,7 @@ class State {
private:
class FastClock;
struct SharedState;
struct ThreadStats;
State(FastClock* clock, SharedState* s, int t);
bool StartRunning();
@ -234,7 +237,10 @@ class State {
void NewInterval();
bool AllStarting();
static void* RunWrapper(void* arg);
void Run();
void RunAsThread();
void Wait();
enum EState {
STATE_INITIAL, // KeepRunning hasn't been called
@ -242,7 +248,9 @@ class State {
STATE_RUNNING, // Running and being timed
STATE_STOPPING, // Not being timed but waiting for other threads
STATE_STOPPED, // Stopped
} state_;
};
EState state_;
FastClock* clock_;
@ -250,6 +258,8 @@ class State {
// BenchmarkInstance
SharedState* shared_;
pthread_t thread_;
// Custom label set by the user.
std::string label_;
@ -274,6 +284,8 @@ class State {
// True if the current interval is the continuation of a previous one.
bool is_continuation_;
std::unique_ptr<ThreadStats> stats_;
friend class internal::Benchmark;
DISALLOW_COPY_AND_ASSIGN(State);
};
@ -345,8 +357,7 @@ class Benchmark {
// of some piece of code.
// Run one instance of this benchmark concurrently in t threads.
// TODO(dominic): Allow multithreaded benchmarks
//Benchmark* Threads(int t);
Benchmark* Threads(int t);
// Pick a set of values T from [min_threads,max_threads].
// min_threads and max_threads are always included in T. Run this
@ -360,10 +371,10 @@ class Benchmark {
// Foo in 4 threads
// Foo in 8 threads
// Foo in 16 threads
// Benchmark* ThreadRange(int min_threads, int max_threads);
Benchmark* ThreadRange(int min_threads, int max_threads);
// Equivalent to ThreadRange(NumCPUs(), NumCPUs())
//Benchmark* ThreadPerCpu();
Benchmark* ThreadPerCpu();
// TODO(dominic): Control whether or not real-time is used for this benchmark
@ -372,7 +383,6 @@ class Benchmark {
// Used inside the benchmark implementation
struct Instance;
struct ThreadStats;
// Extract the list of benchmark instances that match the specified
// regular expression.

View File

@ -36,6 +36,7 @@ char (&ArraySizeHelper(const T (&array)[N]))[N];
#define CHECK(b) do { if (!(b)) assert(false); } while(0)
#define CHECK_EQ(a, b) CHECK((a) == (b))
#define CHECK_NE(a, b) CHECK((a) != (b))
#define CHECK_GE(a, b) CHECK((a) >= (b))
#define CHECK_LE(a, b) CHECK((a) <= (b))
#define CHECK_GT(a, b) CHECK((a) > (b))

View File

@ -174,34 +174,6 @@ inline std::string HumanReadableNumber(double n) {
return ToBinaryStringFullySpecified(n, 1.1, 1);
}
} // end namespace
namespace internal {
struct Benchmark::ThreadStats {
int64_t bytes_processed;
int64_t items_processed;
ThreadStats() { Reset(); }
void Reset() {
bytes_processed = 0;
items_processed = 0;
}
void Add(const ThreadStats& other) {
bytes_processed += other.bytes_processed;
items_processed += other.items_processed;
}
};
} // end namespace internal
namespace {
// Per-thread stats
pthread_key_t thread_stats_key;
internal::Benchmark::ThreadStats* thread_stats = nullptr;
// For non-dense Range, intermediate values are powers of kRangeMultiplier.
static const int kRangeMultiplier = 8;
@ -210,6 +182,9 @@ static const int kRangeMultiplier = 8;
static pthread_mutex_t benchmark_mutex;
static std::vector<internal::Benchmark*>* families = NULL;
pthread_mutex_t starting_mutex;
pthread_cond_t starting_cv;
bool running_benchmark = false;
// Should this benchmark report memory usage?
@ -222,10 +197,6 @@ bool use_real_time;
// Overhead of an empty benchmark.
double overhead = 0.0;
void DeleteThreadStats(void* p) {
delete (internal::Benchmark::ThreadStats*) p;
}
// Return prefix to print in front of each reported line
const char* Prefix() {
#ifdef NDEBUG
@ -534,9 +505,24 @@ class State::FastClock {
DISALLOW_COPY_AND_ASSIGN(FastClock);
};
namespace internal {
struct State::ThreadStats {
int64_t bytes_processed;
int64_t items_processed;
const int Benchmark::kNumCpuMarker;
ThreadStats() { Reset(); }
void Reset() {
bytes_processed = 0;
items_processed = 0;
}
void Add(const ThreadStats& other) {
bytes_processed += other.bytes_processed;
items_processed += other.items_processed;
}
};
namespace internal {
// Information kept per benchmark we may want to run
struct Benchmark::Instance {
@ -563,12 +549,13 @@ struct State::SharedState {
int starting; // Number of threads that have entered STARTING state
int stopping; // Number of threads that have entered STOPPING state
int threads; // Number of total threads that are running concurrently
internal::Benchmark::ThreadStats stats;
ThreadStats stats;
std::vector<internal::BenchmarkRunData> runs; // accumulated runs
std::string label;
SharedState(const internal::Benchmark::Instance* b, int t)
: instance(b), starting(0), stopping(0), threads(t) {
explicit SharedState(const internal::Benchmark::Instance* b)
: instance(b), starting(0), stopping(0),
threads(b == nullptr ? 1 : b->threads) {
pthread_mutex_init(&mu, nullptr);
}
@ -647,7 +634,7 @@ Benchmark* Benchmark::Apply(void (*custom_arguments)(Benchmark* benchmark)) {
custom_arguments(this);
return this;
}
/*
Benchmark* Benchmark::Threads(int t) {
CHECK_GT(t, 0);
mutex_lock l(&benchmark_mutex);
@ -666,10 +653,10 @@ Benchmark* Benchmark::ThreadRange(int min_threads, int max_threads) {
Benchmark* Benchmark::ThreadPerCpu() {
mutex_lock l(&benchmark_mutex);
thread_counts_.push_back(kNumCpuMarker);
thread_counts_.push_back(NumCPUs());
return this;
}
*/
void Benchmark::AddRange(std::vector<int>* dst, int lo, int hi, int mult) {
CHECK_GE(lo, 0);
CHECK_GE(hi, lo);
@ -697,12 +684,10 @@ std::vector<Benchmark::Instance> Benchmark::CreateBenchmarkInstances(
std::vector<Benchmark::Instance> instances;
const bool is_multithreaded = (!thread_counts_.empty());
const std::vector<int>* thread_counts =
(is_multithreaded ? &thread_counts_ : &one_thread);
for (size_t t = 0; t < thread_counts->size(); ++t) {
int num_threads = (*thread_counts)[t];
if (num_threads == kNumCpuMarker)
num_threads = NumCPUs();
const std::vector<int>& thread_counts =
(is_multithreaded ? thread_counts_ : one_thread);
for (size_t t = 0; t < thread_counts.size(); ++t) {
int num_threads = thread_counts[t];
Instance instance;
instance.name = name_;
@ -785,7 +770,7 @@ void Benchmark::FindBenchmarks(const std::string& spec,
void Benchmark::MeasureOverhead() {
State::FastClock clock(State::FastClock::CPU_TIME);
State::SharedState state(NULL, 1);
State::SharedState state(nullptr);
State runner(&clock, &state, 0);
while (runner.KeepRunning()) {}
overhead = state.runs[0].real_accumulated_time /
@ -802,28 +787,22 @@ void Benchmark::RunInstance(const Instance& b, BenchmarkReporter* br) {
State::FastClock clock(State::FastClock::CPU_TIME);
// Initialize the test runners.
State::SharedState state(&b, b.threads);
State::SharedState state(&b);
{
std::unique_ptr<State> runners[b.threads];
// TODO: create thread objects
for (int i = 0; i < b.threads; ++i)
runners[i].reset(new State(&clock, &state, i));
// Run them all.
for (int i = 0; i < b.threads; ++i) {
State* r = runners[i].release();
if (b.multithreaded()) {
// TODO: start pthreads (member of state?) and set up thread local
// pointers to stats
//pool->Add(base::NewCallback(r, &State::Run));
} else {
pthread_setspecific(thread_stats_key, thread_stats);
r->Run();
}
if (b.multithreaded())
runners[i]->RunAsThread();
else
runners[i]->Run();
}
if (b.multithreaded()) {
// TODO: join all the threads
//pool->JoinAll();
for (int i = 0; i < b.threads; ++i)
runners[i]->Wait();
}
}
/*
@ -849,7 +828,6 @@ void Benchmark::RunInstance(const Instance& b, BenchmarkReporter* br) {
for (internal::BenchmarkRunData& report : state.runs) {
double seconds = (use_real_time ? report.real_accumulated_time :
report.cpu_accumulated_time);
// TODO: add the thread index here?
report.benchmark_name = b.name;
report.report_label = state.label;
report.bytes_per_second = state.stats.bytes_processed / seconds;
@ -890,7 +868,7 @@ double Benchmark::MeasurePeakHeapMemory(const Instance& b) {
}
} // end namespace internal
State::State(FastClock* clock, SharedState* s, int t)
: thread_index(t),
state_(STATE_INITIAL),
@ -905,9 +883,12 @@ State::State(FastClock* clock, SharedState* s, int t)
total_iterations_(0),
interval_micros_(
static_cast<int64_t>(kNumMicrosPerSecond * FLAGS_benchmark_min_time /
FLAGS_benchmark_repetitions)) {
FLAGS_benchmark_repetitions)),
is_continuation_(false),
stats_(new ThreadStats()) {
CHECK(clock != nullptr);
CHECK(s != nullptr);
}
bool State::KeepRunning() {
@ -941,17 +922,13 @@ void State::ResumeTiming() {
void State::SetBytesProcessed(int64_t bytes) {
CHECK_EQ(STATE_STOPPED, state_);
mutex_lock l(&shared_->mu);
internal::Benchmark::ThreadStats* thread_stats =
(internal::Benchmark::ThreadStats*) pthread_getspecific(thread_stats_key);
thread_stats->bytes_processed = bytes;
stats_->bytes_processed = bytes;
}
void State::SetItemsProcessed(int64_t items) {
CHECK_EQ(STATE_STOPPED, state_);
mutex_lock l(&shared_->mu);
internal::Benchmark::ThreadStats* thread_stats =
(internal::Benchmark::ThreadStats*) pthread_getspecific(thread_stats_key);
thread_stats->items_processed = items;
stats_->items_processed = items;
}
void State::SetLabel(const std::string& label) {
@ -980,6 +957,7 @@ int State::range_y() const {
}
bool State::StartRunning() {
bool last_thread = false;
{
mutex_lock l(&shared_->mu);
CHECK_EQ(state_, STATE_INITIAL);
@ -987,29 +965,40 @@ bool State::StartRunning() {
is_continuation_ = false;
CHECK_LT(shared_->starting, shared_->threads);
++shared_->starting;
if (shared_->starting == shared_->threads) {
// Last thread to start.
clock_->InitType(
use_real_time ? FastClock::REAL_TIME : FastClock::CPU_TIME);
} else {
// Wait for others.
// TODO(dominic): semaphore!
// while (pthread_getsemaphore(shared_->starting_sem_) !=
// shared_->threads) { }
//shared_->mu.Await(base::Condition(this, &State::AllStarting));
}
CHECK_EQ(state_, STATE_STARTING);
state_ = STATE_RUNNING;
#ifdef DEBUG
std::cout << "[" << thread_index << "] "
<< shared_->starting << "/" << shared_->threads << " starting\n";
#endif
last_thread = shared_->starting == shared_->threads;
}
if (last_thread) {
clock_->InitType(
use_real_time ? FastClock::REAL_TIME : FastClock::CPU_TIME);
#ifdef DEBUG
std::cout << "[" << thread_index << "] unlocking\n";
#endif
{
mutex_lock l(&starting_mutex);
pthread_cond_broadcast(&starting_cv);
}
} else {
#ifdef DEBUG
std::cout << "[" << thread_index << "] waiting\n";
#endif
mutex_lock l(&starting_mutex);
pthread_cond_wait(&starting_cv, &starting_mutex);
#ifdef DEBUG
std::cout << "[" << thread_index << "] unlocked\n";
#endif
}
CHECK_EQ(state_, STATE_STARTING);
state_ = STATE_RUNNING;
NewInterval();
return true;
}
bool State::AllStarting() {
CHECK_LE(shared_->starting, shared_->threads);
return shared_->starting == shared_->threads;
}
void State::NewInterval() {
stop_time_micros_ = clock_->NowMicros() + interval_micros_;
if (!is_continuation_) {
@ -1107,16 +1096,30 @@ bool State::MaybeStop() {
}
void State::Run() {
internal::Benchmark::ThreadStats* thread_stats =
(internal::Benchmark::ThreadStats*) pthread_getspecific(thread_stats_key);
thread_stats->Reset();
stats_->Reset();
shared_->instance->bm->function_(*this);
{
mutex_lock l(&shared_->mu);
shared_->stats.Add(*thread_stats);
shared_->stats.Add(*stats_);
}
}
void State::RunAsThread() {
CHECK_EQ(0, pthread_create(&thread_, nullptr, &State::RunWrapper, this));
}
void State::Wait() {
CHECK_EQ(0, pthread_join(thread_, nullptr));
}
// static
void* State::RunWrapper(void* arg) {
State* that = (State*)arg;
CHECK(that != nullptr);
that->Run();
return nullptr;
}
namespace internal {
void RunMatchingBenchmarks(const std::string& spec,
@ -1185,13 +1188,16 @@ void RunSpecifiedBenchmarks() {
spec = "."; // Regexp that matches all benchmarks
internal::ConsoleReporter default_reporter;
internal::RunMatchingBenchmarks(spec, &default_reporter);
pthread_cond_destroy(&starting_cv);
pthread_mutex_destroy(&starting_mutex);
pthread_mutex_destroy(&benchmark_mutex);
}
void Initialize(int* argc, const char** argv) {
//AtomicOps_Internalx86CPUFeaturesInit();
pthread_mutex_init(&benchmark_mutex, nullptr);
pthread_key_create(&thread_stats_key, DeleteThreadStats);
thread_stats = new internal::Benchmark::ThreadStats();
pthread_mutex_init(&starting_mutex, nullptr);
pthread_cond_init(&starting_cv, nullptr);
walltime::Initialize();
internal::ParseCommandLineFlags(argc, argv);
internal::Benchmark::MeasureOverhead();

View File

@ -3,6 +3,7 @@
#include <math.h>
#include <stdint.h>
#include <iostream>
#include <limits>
#include <list>
#include <map>
@ -33,7 +34,8 @@ std::set<int> ConstructRandomSet(int size) {
return s;
}
static std::vector<int>* test_vector = NULL;
pthread_mutex_t test_vector_mu;
std::vector<int>* test_vector = nullptr;
} // end namespace
@ -57,7 +59,7 @@ static void BM_CalculatePiRange(benchmark::State& state) {
state.SetLabel(ss.str());
}
BENCHMARK_RANGE(BM_CalculatePiRange, 1, 1024 * 1024);
/*
static void BM_CalculatePi(benchmark::State& state) {
static const int depth = 1024;
double pi ATTRIBUTE_UNUSED = 0.0;
@ -68,7 +70,7 @@ static void BM_CalculatePi(benchmark::State& state) {
BENCHMARK(BM_CalculatePi)->Threads(8);
BENCHMARK(BM_CalculatePi)->ThreadRange(1, 32);
BENCHMARK(BM_CalculatePi)->ThreadPerCpu();
*/
static void BM_SetInsert(benchmark::State& state) {
while (state.KeepRunning()) {
state.PauseTiming();
@ -107,16 +109,27 @@ static void BM_StringCompare(benchmark::State& state) {
BENCHMARK(BM_StringCompare)->Range(1, 1<<20);
static void BM_SetupTeardown(benchmark::State& state) {
if (state.thread_index == 0)
if (state.thread_index == 0) {
pthread_mutex_init(&test_vector_mu, nullptr);
// No need to lock test_vector_mu here as this is running single-threaded.
test_vector = new std::vector<int>();
while (state.KeepRunning())
test_vector->push_back(0);
}
int i = 0;
while (state.KeepRunning()) {
pthread_mutex_lock(&test_vector_mu);
if (i%2 == 0)
test_vector->push_back(i);
else
test_vector->pop_back();
pthread_mutex_unlock(&test_vector_mu);
++i;
}
if (state.thread_index == 0) {
delete test_vector;
test_vector = NULL;
pthread_mutex_destroy(&test_vector_mu);
}
}
BENCHMARK(BM_SetupTeardown);
BENCHMARK(BM_SetupTeardown)->ThreadPerCpu();
static void BM_LongTest(benchmark::State& state) {
double tracker = 0.0;