[OpenMP] libomp: Add new experimental barrier: two-level distributed barrier

Two-level distributed barrier is a new experimental barrier designed
for Intel hardware that has better performance in some cases than the
default hyper barrier.

This barrier is designed to handle fine granularity parallelism where
barriers are used frequently with little compute and memory access
between barriers. There is no need to use it for codes with few
barriers and large granularity compute, or memory intensive
applications, as little difference will be seen between this barrier
and the default hyper barrier. This barrier is designed to work
optimally with a fixed number of threads, and has a significant setup
time, so should NOT be used in situations where the number of threads
in a team is varied frequently.

The two-level distributed barrier is off by default -- hyper barrier
is used by default. To use this barrier, you must set all barrier
patterns to use this type, because it will not work with other barrier
patterns. Thus, to turn it on, the following settings are required:

KMP_FORKJOIN_BARRIER_PATTERN=dist,dist
KMP_PLAIN_BARRIER_PATTERN=dist,dist
KMP_REDUCTION_BARRIER_PATTERN=dist,dist

Branching factors (set with KMP_FORKJOIN_BARRIER, KMP_PLAIN_BARRIER,
and KMP_REDUCTION_BARRIER) are ignored by the two-level distributed
barrier.

Patch fixed for ITTNotify disabled builds and non-x86 builds

Co-authored-by: Jonathan Peyton <jonathan.l.peyton@intel.com>
Co-authored-by: Vladislav Vinogradov <vlad.vinogradov@intel.com>

Differential Revision: https://reviews.llvm.org/D103121
This commit is contained in:
Terry Wilmarth 2021-07-15 10:28:47 -05:00 committed by Peyton, Jonathan L
parent 84a4caeb84
commit d8e4cb9121
20 changed files with 1589 additions and 457 deletions

View File

@ -109,6 +109,23 @@ if (NOT LIBOMP_HAVE_SHM_OPEN_NO_LRT)
set(CMAKE_REQUIRED_LIBRARIES)
endif()
# Check for aligned memory allocator function
check_include_file(xmmintrin.h LIBOMP_HAVE_XMMINTRIN_H)
set(OLD_CMAKE_REQUIRED_FLAGS ${CMAKE_REQUIRED_FLAGS})
if (LIBOMP_HAVE_XMMINTRIN_H)
set(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS} -DLIBOMP_HAVE_XMMINTRIN_H")
endif()
set(source_code "// check for _mm_malloc
#ifdef LIBOMP_HAVE_XMMINTRIN_H
#include <xmmintrin.h>
#endif
int main() { void *ptr = _mm_malloc(sizeof(int) * 1000, 64); _mm_free(ptr); return 0; }")
check_cxx_source_compiles("${source_code}" LIBOMP_HAVE__MM_MALLOC)
set(CMAKE_REQUIRED_FLAGS ${OLD_CMAKE_REQUIRED_FLAGS})
check_symbol_exists(aligned_alloc "stdlib.h" LIBOMP_HAVE_ALIGNED_ALLOC)
check_symbol_exists(posix_memalign "stdlib.h" LIBOMP_HAVE_POSIX_MEMALIGN)
check_symbol_exists(_aligned_malloc "malloc.h" LIBOMP_HAVE__ALIGNED_MALLOC)
# Check linker flags
if(WIN32)
libomp_check_linker_flag(/SAFESEH LIBOMP_HAVE_SAFESEH_FLAG)

View File

@ -269,6 +269,7 @@ Using_int_Value "%1$s value \"%2$d\" will be used."
Using_uint_Value "%1$s value \"%2$u\" will be used."
Using_uint64_Value "%1$s value \"%2$s\" will be used."
Using_str_Value "%1$s value \"%2$s\" will be used."
BarrierPatternOverride "Mixing other barrier patterns with dist is prohibited. Using dist for all barrier patterns."
MaxValueUsing "%1$s maximum value \"%2$d\" will be used."
MinValueUsing "%1$s minimum value \"%2$d\" will be used."
MemoryAllocFailed "Memory allocation failed."

View File

@ -115,6 +115,7 @@ typedef unsigned int kmp_hwloc_depth_t;
#include "kmp_debug.h"
#include "kmp_lock.h"
#include "kmp_version.h"
#include "kmp_barrier.h"
#if USE_DEBUGGER
#include "kmp_debugger.h"
#endif
@ -263,6 +264,7 @@ typedef union kmp_root kmp_root_p;
template <bool C = false, bool S = true> class kmp_flag_32;
template <bool C = false, bool S = true> class kmp_flag_64;
template <bool C = false, bool S = true> class kmp_atomic_flag_64;
class kmp_flag_oncore;
#ifdef __cplusplus
@ -1879,6 +1881,15 @@ typedef struct kmp_disp {
0 // Thread th_reap_state: not safe to reap (tasking)
#define KMP_SAFE_TO_REAP 1 // Thread th_reap_state: safe to reap (not tasking)
// The flag_type describes the storage used for the flag.
enum flag_type {
flag32, /**< atomic 32 bit flags */
flag64, /**< 64 bit flags */
atomic_flag64, /**< atomic 64 bit flags */
flag_oncore, /**< special 64-bit flag for on-core barrier (hierarchical) */
flag_unset
};
enum barrier_type {
bs_plain_barrier = 0, /* 0, All non-fork/join barriers (except reduction
barriers if enabled) */
@ -1902,6 +1913,7 @@ typedef enum kmp_bar_pat { /* Barrier communication patterns */
bp_hyper_bar = 2, /* Hypercube-embedded tree with min
branching factor 2^n */
bp_hierarchical_bar = 3, /* Machine hierarchy tree */
bp_dist_bar = 4, /* Distributed barrier */
bp_last_bar /* Placeholder to mark the end */
} kmp_bar_pat_e;
@ -2626,6 +2638,7 @@ typedef struct KMP_ALIGN_CACHE kmp_base_info {
/* while awaiting queuing lock acquire */
volatile void *th_sleep_loc; // this points at a kmp_flag<T>
flag_type th_sleep_loc_type; // enum type of flag stored in th_sleep_loc
ident_t *th_ident;
unsigned th_x; // Random number generator data
@ -2646,6 +2659,9 @@ typedef struct KMP_ALIGN_CACHE kmp_base_info {
written by the worker thread) */
kmp_uint8 th_active_in_pool; // included in count of #active threads in pool
int th_active; // ! sleeping; 32 bits for TCR/TCW
std::atomic<kmp_uint32> th_used_in_team; // Flag indicating use in team
// 0 = not used in team; 1 = used in team;
// 2 = transitioning to not used in team; 3 = transitioning to used in team
struct cons_header *th_cons; // used for consistency check
#if KMP_USE_HIER_SCHED
// used for hierarchical scheduling
@ -2825,6 +2841,7 @@ typedef struct KMP_ALIGN_CACHE kmp_base_team {
#if USE_ITT_BUILD
void *t_stack_id; // team specific stack stitching id (for ittnotify)
#endif /* USE_ITT_BUILD */
distributedBarrier *b; // Distributed barrier data associated with team
} kmp_base_team_t;
union KMP_ALIGN_CACHE kmp_team {
@ -4126,18 +4143,26 @@ template <bool C, bool S>
extern void __kmp_suspend_32(int th_gtid, kmp_flag_32<C, S> *flag);
template <bool C, bool S>
extern void __kmp_suspend_64(int th_gtid, kmp_flag_64<C, S> *flag);
template <bool C, bool S>
extern void __kmp_atomic_suspend_64(int th_gtid,
kmp_atomic_flag_64<C, S> *flag);
extern void __kmp_suspend_oncore(int th_gtid, kmp_flag_oncore *flag);
#if KMP_HAVE_MWAIT || KMP_HAVE_UMWAIT
template <bool C, bool S>
extern void __kmp_mwait_32(int th_gtid, kmp_flag_32<C, S> *flag);
template <bool C, bool S>
extern void __kmp_mwait_64(int th_gtid, kmp_flag_64<C, S> *flag);
template <bool C, bool S>
extern void __kmp_atomic_mwait_64(int th_gtid, kmp_atomic_flag_64<C, S> *flag);
extern void __kmp_mwait_oncore(int th_gtid, kmp_flag_oncore *flag);
#endif
template <bool C, bool S>
extern void __kmp_resume_32(int target_gtid, kmp_flag_32<C, S> *flag);
template <bool C, bool S>
extern void __kmp_resume_64(int target_gtid, kmp_flag_64<C, S> *flag);
template <bool C, bool S>
extern void __kmp_atomic_resume_64(int target_gtid,
kmp_atomic_flag_64<C, S> *flag);
extern void __kmp_resume_oncore(int target_gtid, kmp_flag_oncore *flag);
template <bool C, bool S>
@ -4156,6 +4181,14 @@ int __kmp_execute_tasks_64(kmp_info_t *thread, kmp_int32 gtid,
void *itt_sync_obj,
#endif /* USE_ITT_BUILD */
kmp_int32 is_constrained);
template <bool C, bool S>
int __kmp_atomic_execute_tasks_64(kmp_info_t *thread, kmp_int32 gtid,
kmp_atomic_flag_64<C, S> *flag,
int final_spin, int *thread_finished,
#if USE_ITT_BUILD
void *itt_sync_obj,
#endif /* USE_ITT_BUILD */
kmp_int32 is_constrained);
int __kmp_execute_tasks_oncore(kmp_info_t *thread, kmp_int32 gtid,
kmp_flag_oncore *flag, int final_spin,
int *thread_finished,

View File

@ -732,7 +732,7 @@ static inline kmp_cmplx128_a16_t operator/(kmp_cmplx128_a16_t &lhs,
#define OP_UPDATE_CRITICAL(TYPE, OP, LCK_ID) \
__kmp_acquire_atomic_lock(&ATOMIC_LOCK##LCK_ID, gtid); \
(*lhs) = (TYPE)((*lhs)OP((TYPE)rhs)); \
(*lhs) = (TYPE)((*lhs)OP rhs); \
__kmp_release_atomic_lock(&ATOMIC_LOCK##LCK_ID, gtid);
// ------------------------------------------------------------------------
@ -791,14 +791,14 @@ static inline kmp_cmplx128_a16_t operator/(kmp_cmplx128_a16_t &lhs,
{ \
TYPE old_value, new_value; \
old_value = *(TYPE volatile *)lhs; \
new_value = (TYPE)(old_value OP((TYPE)rhs)); \
new_value = (TYPE)(old_value OP rhs); \
while (!KMP_COMPARE_AND_STORE_ACQ##BITS( \
(kmp_int##BITS *)lhs, *VOLATILE_CAST(kmp_int##BITS *) & old_value, \
*VOLATILE_CAST(kmp_int##BITS *) & new_value)) { \
KMP_DO_PAUSE; \
\
old_value = *(TYPE volatile *)lhs; \
new_value = (TYPE)(old_value OP((TYPE)rhs)); \
new_value = (TYPE)(old_value OP rhs); \
} \
}

View File

@ -10,12 +10,14 @@
//
//===----------------------------------------------------------------------===//
#include "kmp.h"
#include "kmp_wait_release.h"
#include "kmp_barrier.h"
#include "kmp_itt.h"
#include "kmp_os.h"
#include "kmp_stats.h"
#include "ompt-specific.h"
// for distributed barrier
#include "kmp_affinity.h"
#if KMP_MIC
#include <immintrin.h>
@ -38,6 +40,516 @@
void __kmp_print_structure(void); // Forward declaration
// ---------------------------- Barrier Algorithms ----------------------------
// Distributed barrier
// Compute how many threads to have polling each cache-line.
// We want to limit the number of writes to IDEAL_GO_RESOLUTION.
void distributedBarrier::computeVarsForN(size_t n) {
int nsockets = 1;
if (__kmp_topology) {
int socket_level = __kmp_topology->get_level(KMP_HW_SOCKET);
int core_level = __kmp_topology->get_level(KMP_HW_CORE);
int ncores_per_socket =
__kmp_topology->calculate_ratio(core_level, socket_level);
nsockets = __kmp_topology->get_count(socket_level);
if (nsockets <= 0)
nsockets = 1;
if (ncores_per_socket <= 0)
ncores_per_socket = 1;
threads_per_go = ncores_per_socket >> 1;
if (!fix_threads_per_go) {
// Minimize num_gos
if (threads_per_go > 4) {
if (KMP_OPTIMIZE_FOR_REDUCTIONS) {
threads_per_go = threads_per_go >> 1;
}
if (threads_per_go > 4 && nsockets == 1)
threads_per_go = threads_per_go >> 1;
}
}
if (threads_per_go == 0)
threads_per_go = 1;
fix_threads_per_go = true;
num_gos = n / threads_per_go;
if (n % threads_per_go)
num_gos++;
if (nsockets == 1 || num_gos == 1)
num_groups = 1;
else {
num_groups = num_gos / nsockets;
if (num_gos % nsockets)
num_groups++;
}
if (num_groups <= 0)
num_groups = 1;
gos_per_group = num_gos / num_groups;
if (num_gos % num_groups)
gos_per_group++;
threads_per_group = threads_per_go * gos_per_group;
} else {
num_gos = n / threads_per_go;
if (n % threads_per_go)
num_gos++;
if (num_gos == 1)
num_groups = 1;
else {
num_groups = num_gos / 2;
if (num_gos % 2)
num_groups++;
}
gos_per_group = num_gos / num_groups;
if (num_gos % num_groups)
gos_per_group++;
threads_per_group = threads_per_go * gos_per_group;
}
}
void distributedBarrier::computeGo(size_t n) {
// Minimize num_gos
for (num_gos = 1;; num_gos++)
if (IDEAL_CONTENTION * num_gos >= n)
break;
threads_per_go = n / num_gos;
if (n % num_gos)
threads_per_go++;
while (num_gos > MAX_GOS) {
threads_per_go++;
num_gos = n / threads_per_go;
if (n % threads_per_go)
num_gos++;
}
computeVarsForN(n);
}
// This function is to resize the barrier arrays when the new number of threads
// exceeds max_threads, which is the current size of all the arrays
void distributedBarrier::resize(size_t nthr) {
KMP_DEBUG_ASSERT(nthr > max_threads);
// expand to requested size * 2
max_threads = nthr * 2;
// allocate arrays to new max threads
for (int i = 0; i < MAX_ITERS; ++i) {
if (flags[i])
flags[i] = (flags_s *)KMP_INTERNAL_REALLOC(flags[i],
max_threads * sizeof(flags_s));
else
flags[i] = (flags_s *)KMP_INTERNAL_MALLOC(max_threads * sizeof(flags_s));
}
if (go)
go = (go_s *)KMP_INTERNAL_REALLOC(go, max_threads * sizeof(go_s));
else
go = (go_s *)KMP_INTERNAL_MALLOC(max_threads * sizeof(go_s));
if (iter)
iter = (iter_s *)KMP_INTERNAL_REALLOC(iter, max_threads * sizeof(iter_s));
else
iter = (iter_s *)KMP_INTERNAL_MALLOC(max_threads * sizeof(iter_s));
if (sleep)
sleep =
(sleep_s *)KMP_INTERNAL_REALLOC(sleep, max_threads * sizeof(sleep_s));
else
sleep = (sleep_s *)KMP_INTERNAL_MALLOC(max_threads * sizeof(sleep_s));
}
// This function is to set all the go flags that threads might be waiting
// on, and when blocktime is not infinite, it should be followed by a wake-up
// call to each thread
kmp_uint64 distributedBarrier::go_release() {
kmp_uint64 next_go = iter[0].iter + distributedBarrier::MAX_ITERS;
for (size_t j = 0; j < num_gos; j++) {
go[j].go.store(next_go);
}
return next_go;
}
void distributedBarrier::go_reset() {
for (size_t j = 0; j < max_threads; ++j) {
for (size_t i = 0; i < distributedBarrier::MAX_ITERS; ++i) {
flags[i][j].stillNeed = 1;
}
go[j].go.store(0);
iter[j].iter = 0;
}
}
// This function inits/re-inits the distributed barrier for a particular number
// of threads. If a resize of arrays is needed, it calls the resize function.
void distributedBarrier::init(size_t nthr) {
size_t old_max = max_threads;
if (nthr > max_threads) { // need more space in arrays
resize(nthr);
}
for (size_t i = 0; i < max_threads; i++) {
for (size_t j = 0; j < distributedBarrier::MAX_ITERS; j++) {
flags[j][i].stillNeed = 1;
}
go[i].go.store(0);
iter[i].iter = 0;
if (i >= old_max)
sleep[i].sleep = false;
}
// Recalculate num_gos, etc. based on new nthr
computeVarsForN(nthr);
num_threads = nthr;
if (team_icvs == NULL)
team_icvs = __kmp_allocate(sizeof(kmp_internal_control_t));
}
// This function is used only when KMP_BLOCKTIME is not infinite.
// static
void __kmp_dist_barrier_wakeup(enum barrier_type bt, kmp_team_t *team,
size_t start, size_t stop, size_t inc,
size_t tid) {
KMP_DEBUG_ASSERT(__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME);
if (bt == bs_forkjoin_barrier && TCR_4(__kmp_global.g.g_done))
return;
kmp_info_t **other_threads = team->t.t_threads;
for (size_t thr = start; thr < stop; thr += inc) {
KMP_DEBUG_ASSERT(other_threads[thr]);
int gtid = other_threads[thr]->th.th_info.ds.ds_gtid;
// Wake up worker regardless of if it appears to be sleeping or not
__kmp_atomic_resume_64(gtid, (kmp_atomic_flag_64<> *)NULL);
}
}
static void __kmp_dist_barrier_gather(
enum barrier_type bt, kmp_info_t *this_thr, int gtid, int tid,
void (*reduce)(void *, void *) USE_ITT_BUILD_ARG(void *itt_sync_obj)) {
KMP_TIME_DEVELOPER_PARTITIONED_BLOCK(KMP_dist_gather);
kmp_team_t *team;
distributedBarrier *b;
kmp_info_t **other_threads;
kmp_uint64 my_current_iter, my_next_iter;
kmp_uint32 nproc;
bool group_leader;
team = this_thr->th.th_team;
nproc = this_thr->th.th_team_nproc;
other_threads = team->t.t_threads;
b = team->t.b;
my_current_iter = b->iter[tid].iter;
my_next_iter = (my_current_iter + 1) % distributedBarrier::MAX_ITERS;
group_leader = ((tid % b->threads_per_group) == 0);
KA_TRACE(20,
("__kmp_dist_barrier_gather: T#%d(%d:%d) enter; barrier type %d\n",
gtid, team->t.t_id, tid, bt));
#if USE_ITT_BUILD && USE_ITT_NOTIFY
// Barrier imbalance - save arrive time to the thread
if (__kmp_forkjoin_frames_mode == 3 || __kmp_forkjoin_frames_mode == 2) {
this_thr->th.th_bar_arrive_time = this_thr->th.th_bar_min_time =
__itt_get_timestamp();
}
#endif
if (group_leader) {
// Start from the thread after the group leader
size_t group_start = tid + 1;
size_t group_end = tid + b->threads_per_group;
size_t threads_pending = 0;
if (group_end > nproc)
group_end = nproc;
do { // wait for threads in my group
threads_pending = 0;
// Check all the flags every time to avoid branch misspredict
for (size_t thr = group_start; thr < group_end; thr++) {
// Each thread uses a different cache line
threads_pending += b->flags[my_current_iter][thr].stillNeed;
}
// Execute tasks here
if (__kmp_tasking_mode != tskm_immediate_exec) {
kmp_task_team_t *task_team = this_thr->th.th_task_team;
if (task_team != NULL) {
if (TCR_SYNC_4(task_team->tt.tt_active)) {
if (KMP_TASKING_ENABLED(task_team)) {
int tasks_completed = FALSE;
__kmp_atomic_execute_tasks_64(
this_thr, gtid, (kmp_atomic_flag_64<> *)NULL, FALSE,
&tasks_completed USE_ITT_BUILD_ARG(itt_sync_obj), 0);
} else
this_thr->th.th_reap_state = KMP_SAFE_TO_REAP;
}
} else {
this_thr->th.th_reap_state = KMP_SAFE_TO_REAP;
} // if
}
if (TCR_4(__kmp_global.g.g_done)) {
if (__kmp_global.g.g_abort)
__kmp_abort_thread();
break;
} else if (__kmp_tasking_mode != tskm_immediate_exec &&
this_thr->th.th_reap_state == KMP_SAFE_TO_REAP) {
this_thr->th.th_reap_state = KMP_NOT_SAFE_TO_REAP;
}
} while (threads_pending > 0);
if (reduce) { // Perform reduction if needed
OMPT_REDUCTION_DECL(this_thr, gtid);
OMPT_REDUCTION_BEGIN;
// Group leader reduces all threads in group
for (size_t thr = group_start; thr < group_end; thr++) {
(*reduce)(this_thr->th.th_local.reduce_data,
other_threads[thr]->th.th_local.reduce_data);
}
OMPT_REDUCTION_END;
}
// Set flag for next iteration
b->flags[my_next_iter][tid].stillNeed = 1;
// Each thread uses a different cache line; resets stillNeed to 0 to
// indicate it has reached the barrier
b->flags[my_current_iter][tid].stillNeed = 0;
do { // wait for all group leaders
threads_pending = 0;
for (size_t thr = 0; thr < nproc; thr += b->threads_per_group) {
threads_pending += b->flags[my_current_iter][thr].stillNeed;
}
// Execute tasks here
if (__kmp_tasking_mode != tskm_immediate_exec) {
kmp_task_team_t *task_team = this_thr->th.th_task_team;
if (task_team != NULL) {
if (TCR_SYNC_4(task_team->tt.tt_active)) {
if (KMP_TASKING_ENABLED(task_team)) {
int tasks_completed = FALSE;
__kmp_atomic_execute_tasks_64(
this_thr, gtid, (kmp_atomic_flag_64<> *)NULL, FALSE,
&tasks_completed USE_ITT_BUILD_ARG(itt_sync_obj), 0);
} else
this_thr->th.th_reap_state = KMP_SAFE_TO_REAP;
}
} else {
this_thr->th.th_reap_state = KMP_SAFE_TO_REAP;
} // if
}
if (TCR_4(__kmp_global.g.g_done)) {
if (__kmp_global.g.g_abort)
__kmp_abort_thread();
break;
} else if (__kmp_tasking_mode != tskm_immediate_exec &&
this_thr->th.th_reap_state == KMP_SAFE_TO_REAP) {
this_thr->th.th_reap_state = KMP_NOT_SAFE_TO_REAP;
}
} while (threads_pending > 0);
if (reduce) { // Perform reduction if needed
if (KMP_MASTER_TID(tid)) { // Master reduces over group leaders
OMPT_REDUCTION_DECL(this_thr, gtid);
OMPT_REDUCTION_BEGIN;
for (size_t thr = b->threads_per_group; thr < nproc;
thr += b->threads_per_group) {
(*reduce)(this_thr->th.th_local.reduce_data,
other_threads[thr]->th.th_local.reduce_data);
}
OMPT_REDUCTION_END;
}
}
} else {
// Set flag for next iteration
b->flags[my_next_iter][tid].stillNeed = 1;
// Each thread uses a different cache line; resets stillNeed to 0 to
// indicate it has reached the barrier
b->flags[my_current_iter][tid].stillNeed = 0;
}
KMP_MFENCE();
KA_TRACE(20,
("__kmp_dist_barrier_gather: T#%d(%d:%d) exit for barrier type %d\n",
gtid, team->t.t_id, tid, bt));
}
static void __kmp_dist_barrier_release(
enum barrier_type bt, kmp_info_t *this_thr, int gtid, int tid,
int propagate_icvs USE_ITT_BUILD_ARG(void *itt_sync_obj)) {
KMP_TIME_DEVELOPER_PARTITIONED_BLOCK(KMP_dist_release);
kmp_team_t *team;
distributedBarrier *b;
kmp_bstate_t *thr_bar;
kmp_uint64 my_current_iter, next_go;
size_t my_go_index;
bool group_leader;
KA_TRACE(20, ("__kmp_dist_barrier_release: T#%d(%d) enter; barrier type %d\n",
gtid, tid, bt));
thr_bar = &this_thr->th.th_bar[bt].bb;
if (!KMP_MASTER_TID(tid)) {
// workers and non-master group leaders need to check their presence in team
do {
if (this_thr->th.th_used_in_team.load() != 1 &&
this_thr->th.th_used_in_team.load() != 3) {
// Thread is not in use in a team. Wait on location in tid's thread
// struct. The 0 value tells anyone looking that this thread is spinning
// or sleeping until this location becomes 3 again; 3 is the transition
// state to get to 1 which is waiting on go and being in the team
kmp_flag_32<false, false> my_flag(&(this_thr->th.th_used_in_team), 3);
if (KMP_COMPARE_AND_STORE_ACQ32(&(this_thr->th.th_used_in_team), 2,
0) ||
this_thr->th.th_used_in_team.load() == 0) {
my_flag.wait(this_thr, true USE_ITT_BUILD_ARG(itt_sync_obj));
}
#if USE_ITT_BUILD && USE_ITT_NOTIFY
if ((__itt_sync_create_ptr && itt_sync_obj == NULL) || KMP_ITT_DEBUG) {
// In fork barrier where we could not get the object reliably
itt_sync_obj =
__kmp_itt_barrier_object(gtid, bs_forkjoin_barrier, 0, -1);
// Cancel wait on previous parallel region...
__kmp_itt_task_starting(itt_sync_obj);
if (bt == bs_forkjoin_barrier && TCR_4(__kmp_global.g.g_done))
return;
itt_sync_obj = __kmp_itt_barrier_object(gtid, bs_forkjoin_barrier);
if (itt_sync_obj != NULL)
// Call prepare as early as possible for "new" barrier
__kmp_itt_task_finished(itt_sync_obj);
} else
#endif /* USE_ITT_BUILD && USE_ITT_NOTIFY */
if (bt == bs_forkjoin_barrier && TCR_4(__kmp_global.g.g_done))
return;
}
if (this_thr->th.th_used_in_team.load() != 1 &&
this_thr->th.th_used_in_team.load() != 3) // spurious wake-up?
continue;
if (bt == bs_forkjoin_barrier && TCR_4(__kmp_global.g.g_done))
return;
// At this point, the thread thinks it is in use in a team, or in
// transition to be used in a team, but it might have reached this barrier
// before it was marked unused by the team. Unused threads are awoken and
// shifted to wait on local thread struct elsewhere. It also might reach
// this point by being picked up for use by a different team. Either way,
// we need to update the tid.
tid = __kmp_tid_from_gtid(gtid);
team = this_thr->th.th_team;
KMP_DEBUG_ASSERT(tid >= 0);
KMP_DEBUG_ASSERT(team);
b = team->t.b;
my_current_iter = b->iter[tid].iter;
next_go = my_current_iter + distributedBarrier::MAX_ITERS;
my_go_index = tid / b->threads_per_go;
if (this_thr->th.th_used_in_team.load() == 3) {
KMP_COMPARE_AND_STORE_ACQ32(&(this_thr->th.th_used_in_team), 3, 1);
}
// Check if go flag is set
if (b->go[my_go_index].go.load() != next_go) {
// Wait on go flag on team
kmp_atomic_flag_64<false, true> my_flag(
&(b->go[my_go_index].go), next_go, &(b->sleep[tid].sleep));
my_flag.wait(this_thr, true USE_ITT_BUILD_ARG(itt_sync_obj));
KMP_DEBUG_ASSERT(my_current_iter == b->iter[tid].iter ||
b->iter[tid].iter == 0);
KMP_DEBUG_ASSERT(b->sleep[tid].sleep == false);
}
if (bt == bs_forkjoin_barrier && TCR_4(__kmp_global.g.g_done))
return;
// At this point, the thread's go location was set. This means the primary
// thread is safely in the barrier, and so this thread's data is
// up-to-date, but we should check again that this thread is really in
// use in the team, as it could have been woken up for the purpose of
// changing team size, or reaping threads at shutdown.
if (this_thr->th.th_used_in_team.load() == 1)
break;
} while (1);
if (bt == bs_forkjoin_barrier && TCR_4(__kmp_global.g.g_done))
return;
group_leader = ((tid % b->threads_per_group) == 0);
if (group_leader) {
// Tell all the threads in my group they can go!
for (size_t go_idx = my_go_index + 1;
go_idx < my_go_index + b->gos_per_group; go_idx++) {
b->go[go_idx].go.store(next_go);
}
// Fence added so that workers can see changes to go. sfence inadequate.
KMP_MFENCE();
}
#if KMP_BARRIER_ICV_PUSH
if (propagate_icvs) { // copy ICVs to final dest
__kmp_init_implicit_task(team->t.t_ident, team->t.t_threads[tid], team,
tid, FALSE);
copy_icvs(&team->t.t_implicit_task_taskdata[tid].td_icvs,
(kmp_internal_control_t *)team->t.b->team_icvs);
copy_icvs(&thr_bar->th_fixed_icvs,
&team->t.t_implicit_task_taskdata[tid].td_icvs);
}
#endif
if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME && group_leader) {
// This thread is now awake and participating in the barrier;
// wake up the other threads in the group
size_t nproc = this_thr->th.th_team_nproc;
size_t group_end = tid + b->threads_per_group;
if (nproc < group_end)
group_end = nproc;
__kmp_dist_barrier_wakeup(bt, team, tid + 1, group_end, 1, tid);
}
} else { // Primary thread
team = this_thr->th.th_team;
b = team->t.b;
my_current_iter = b->iter[tid].iter;
next_go = my_current_iter + distributedBarrier::MAX_ITERS;
#if KMP_BARRIER_ICV_PUSH
if (propagate_icvs) {
// primary thread has ICVs in final destination; copy
copy_icvs(&thr_bar->th_fixed_icvs,
&team->t.t_implicit_task_taskdata[tid].td_icvs);
}
#endif
// Tell all the group leaders they can go!
for (size_t go_idx = 0; go_idx < b->num_gos; go_idx += b->gos_per_group) {
b->go[go_idx].go.store(next_go);
}
if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME) {
// Wake-up the group leaders
size_t nproc = this_thr->th.th_team_nproc;
__kmp_dist_barrier_wakeup(bt, team, tid + b->threads_per_group, nproc,
b->threads_per_group, tid);
}
// Tell all the threads in my group they can go!
for (size_t go_idx = 1; go_idx < b->gos_per_group; go_idx++) {
b->go[go_idx].go.store(next_go);
}
// Fence added so that workers can see changes to go. sfence inadequate.
KMP_MFENCE();
if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME) {
// Wake-up the other threads in my group
size_t nproc = this_thr->th.th_team_nproc;
size_t group_end = tid + b->threads_per_group;
if (nproc < group_end)
group_end = nproc;
__kmp_dist_barrier_wakeup(bt, team, tid + 1, group_end, 1, tid);
}
}
// Update to next iteration
KMP_ASSERT(my_current_iter == b->iter[tid].iter);
b->iter[tid].iter = (b->iter[tid].iter + 1) % distributedBarrier::MAX_ITERS;
KA_TRACE(
20, ("__kmp_dist_barrier_release: T#%d(%d:%d) exit for barrier type %d\n",
gtid, team->t.t_id, tid, bt));
}
// Linear Barrier
template <bool cancellable = false>
@ -1354,6 +1866,11 @@ static int __kmp_barrier_template(enum barrier_type bt, int gtid, int is_split,
bt, this_thr, gtid, tid, reduce USE_ITT_BUILD_ARG(itt_sync_obj));
} else {
switch (__kmp_barrier_gather_pattern[bt]) {
case bp_dist_bar: {
__kmp_dist_barrier_gather(bt, this_thr, gtid, tid,
reduce USE_ITT_BUILD_ARG(itt_sync_obj));
break;
}
case bp_hyper_bar: {
// don't set branch bits to 0; use linear
KMP_ASSERT(__kmp_barrier_gather_branch_bits[bt]);
@ -1467,6 +1984,12 @@ static int __kmp_barrier_template(enum barrier_type bt, int gtid, int is_split,
bt, this_thr, gtid, tid, FALSE USE_ITT_BUILD_ARG(itt_sync_obj));
} else {
switch (__kmp_barrier_release_pattern[bt]) {
case bp_dist_bar: {
KMP_ASSERT(__kmp_barrier_release_branch_bits[bt]);
__kmp_dist_barrier_release(bt, this_thr, gtid, tid,
FALSE USE_ITT_BUILD_ARG(itt_sync_obj));
break;
}
case bp_hyper_bar: {
KMP_ASSERT(__kmp_barrier_release_branch_bits[bt]);
__kmp_hyper_barrier_release(bt, this_thr, gtid, tid,
@ -1596,6 +2119,11 @@ void __kmp_end_split_barrier(enum barrier_type bt, int gtid) {
if (!team->t.t_serialized) {
if (KMP_MASTER_GTID(gtid)) {
switch (__kmp_barrier_release_pattern[bt]) {
case bp_dist_bar: {
__kmp_dist_barrier_release(bt, this_thr, gtid, tid,
FALSE USE_ITT_BUILD_ARG(NULL));
break;
}
case bp_hyper_bar: {
KMP_ASSERT(__kmp_barrier_release_branch_bits[bt]);
__kmp_hyper_barrier_release(bt, this_thr, gtid, tid,
@ -1705,8 +2233,8 @@ void __kmp_join_barrier(int gtid) {
if (__kmp_tasking_mode == tskm_extra_barrier) {
__kmp_tasking_barrier(team, this_thr, gtid);
KA_TRACE(10, ("__kmp_join_barrier: T#%d(%d:%d) past taking barrier\n", gtid,
team_id, tid));
KA_TRACE(10, ("__kmp_join_barrier: T#%d(%d:%d) past tasking barrier\n",
gtid, team_id, tid));
}
#ifdef KMP_DEBUG
if (__kmp_tasking_mode != tskm_immediate_exec) {
@ -1715,8 +2243,9 @@ void __kmp_join_barrier(int gtid) {
__kmp_gtid_from_thread(this_thr), team_id,
team->t.t_task_team[this_thr->th.th_task_state],
this_thr->th.th_task_team));
KMP_DEBUG_ASSERT(this_thr->th.th_task_team ==
team->t.t_task_team[this_thr->th.th_task_state]);
if (this_thr->th.th_task_team)
KMP_DEBUG_ASSERT(this_thr->th.th_task_team ==
team->t.t_task_team[this_thr->th.th_task_state]);
}
#endif /* KMP_DEBUG */
@ -1742,6 +2271,11 @@ void __kmp_join_barrier(int gtid) {
#endif /* USE_ITT_BUILD */
switch (__kmp_barrier_gather_pattern[bs_forkjoin_barrier]) {
case bp_dist_bar: {
__kmp_dist_barrier_gather(bs_forkjoin_barrier, this_thr, gtid, tid,
NULL USE_ITT_BUILD_ARG(itt_sync_obj));
break;
}
case bp_hyper_bar: {
KMP_ASSERT(__kmp_barrier_gather_branch_bits[bs_forkjoin_barrier]);
__kmp_hyper_barrier_gather(bs_forkjoin_barrier, this_thr, gtid, tid,
@ -1787,8 +2321,7 @@ void __kmp_join_barrier(int gtid) {
team_thread->th.th_stats->setIdleFlag();
if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME &&
team_thread->th.th_sleep_loc != NULL)
__kmp_null_resume_wrapper(__kmp_gtid_from_thread(team_thread),
team_thread->th.th_sleep_loc);
__kmp_null_resume_wrapper(team_thread);
}
#endif
#if USE_ITT_BUILD
@ -1933,6 +2466,11 @@ void __kmp_fork_barrier(int gtid, int tid) {
} // primary thread
switch (__kmp_barrier_release_pattern[bs_forkjoin_barrier]) {
case bp_dist_bar: {
__kmp_dist_barrier_release(bs_forkjoin_barrier, this_thr, gtid, tid,
TRUE USE_ITT_BUILD_ARG(NULL));
break;
}
case bp_hyper_bar: {
KMP_ASSERT(__kmp_barrier_release_branch_bits[bs_forkjoin_barrier]);
__kmp_hyper_barrier_release(bs_forkjoin_barrier, this_thr, gtid, tid,

View File

@ -0,0 +1,141 @@
/*
* kmp_barrier.h
*/
//===----------------------------------------------------------------------===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
#ifndef KMP_BARRIER_H
#define KMP_BARRIER_H
#include "kmp.h"
#include "kmp_i18n.h"
#if KMP_HAVE_XMMINTRIN_H && KMP_HAVE__MM_MALLOC
#include <xmmintrin.h>
#define KMP_ALIGNED_ALLOCATE(size, alignment) _mm_malloc(size, alignment)
#define KMP_ALIGNED_FREE(ptr) _mm_free(ptr)
#elif KMP_HAVE_ALIGNED_ALLOC
#define KMP_ALIGNED_ALLOCATE(size, alignment) aligned_alloc(alignment, size)
#define KMP_ALIGNED_FREE(ptr) free(ptr)
#elif KMP_HAVE_POSIX_MEMALIGN
static inline void *KMP_ALIGNED_ALLOCATE(size_t size, size_t alignment) {
void *ptr;
int n = posix_memalign(&ptr, alignment, size);
if (n != 0) {
if (ptr)
free(ptr);
return nullptr;
}
return ptr;
}
#define KMP_ALIGNED_FREE(ptr) free(ptr)
#elif KMP_HAVE__ALIGNED_MALLOC
#include <malloc.h>
#define KMP_ALIGNED_ALLOCATE(size, alignment) _aligned_malloc(size, alignment)
#define KMP_ALIGNED_FREE(ptr) _aligned_free(ptr)
#else
#define KMP_ALIGNED_ALLOCATE(size, alignment) KMP_INTERNAL_MALLOC(size)
#define KMP_ALIGNED_FREE(ptr) KMP_INTERNAL_FREE(ptr)
#endif
// Use four cache lines: MLC tends to prefetch the next or previous cache line
// creating a possible fake conflict between cores, so this is the only way to
// guarantee that no such prefetch can happen.
#ifndef KMP_FOURLINE_ALIGN_CACHE
#define KMP_FOURLINE_ALIGN_CACHE KMP_ALIGN(4 * CACHE_LINE)
#endif
#define KMP_OPTIMIZE_FOR_REDUCTIONS 0
class distributedBarrier {
struct flags_s {
kmp_uint32 volatile KMP_FOURLINE_ALIGN_CACHE stillNeed;
};
struct go_s {
std::atomic<kmp_uint64> KMP_FOURLINE_ALIGN_CACHE go;
};
struct iter_s {
kmp_uint64 volatile KMP_FOURLINE_ALIGN_CACHE iter;
};
struct sleep_s {
std::atomic<bool> KMP_FOURLINE_ALIGN_CACHE sleep;
};
void init(size_t nthr);
void resize(size_t nthr);
void computeGo(size_t n);
void computeVarsForN(size_t n);
public:
enum {
MAX_ITERS = 3,
MAX_GOS = 8,
IDEAL_GOS = 4,
IDEAL_CONTENTION = 16,
};
flags_s *flags[MAX_ITERS];
go_s *go;
iter_s *iter;
sleep_s *sleep;
size_t KMP_ALIGN_CACHE num_threads; // number of threads in barrier
size_t KMP_ALIGN_CACHE max_threads; // size of arrays in data structure
// number of go signals each requiring one write per iteration
size_t KMP_ALIGN_CACHE num_gos;
// number of groups of gos
size_t KMP_ALIGN_CACHE num_groups;
// threads per go signal
size_t KMP_ALIGN_CACHE threads_per_go;
bool KMP_ALIGN_CACHE fix_threads_per_go;
// threads per group
size_t KMP_ALIGN_CACHE threads_per_group;
// number of go signals in a group
size_t KMP_ALIGN_CACHE gos_per_group;
void *team_icvs;
distributedBarrier() = delete;
~distributedBarrier() = delete;
// Used instead of constructor to create aligned data
static distributedBarrier *allocate(int nThreads) {
distributedBarrier *d = (distributedBarrier *)KMP_ALIGNED_ALLOCATE(
sizeof(distributedBarrier), 4 * CACHE_LINE);
if (!d) {
KMP_FATAL(MemoryAllocFailed);
}
d->num_threads = 0;
d->max_threads = 0;
for (int i = 0; i < MAX_ITERS; ++i)
d->flags[i] = NULL;
d->go = NULL;
d->iter = NULL;
d->sleep = NULL;
d->team_icvs = NULL;
d->fix_threads_per_go = false;
// calculate gos and groups ONCE on base size
d->computeGo(nThreads);
d->init(nThreads);
return d;
}
static void deallocate(distributedBarrier *db) { KMP_ALIGNED_FREE(db); }
void update_num_threads(size_t nthr) { init(nthr); }
bool need_resize(size_t new_nthr) { return (new_nthr > max_threads); }
size_t get_num_threads() { return num_threads; }
kmp_uint64 go_release();
void go_reset();
};
#endif // KMP_BARRIER_H

View File

@ -84,6 +84,16 @@
#define KMP_HAVE_ATTRIBUTE_RTM LIBOMP_HAVE_ATTRIBUTE_RTM
#cmakedefine01 LIBOMP_ARCH_AARCH64_A64FX
#define KMP_ARCH_AARCH64_A64FX LIBOMP_ARCH_AARCH64_A64FX
#cmakedefine01 LIBOMP_HAVE_XMMINTRIN_H
#define KMP_HAVE_XMMINTRIN_H LIBOMP_HAVE_XMMINTRIN_H
#cmakedefine01 LIBOMP_HAVE__MM_MALLOC
#define KMP_HAVE__MM_MALLOC LIBOMP_HAVE__MM_MALLOC
#cmakedefine01 LIBOMP_HAVE_ALIGNED_ALLOC
#define KMP_HAVE_ALIGNED_ALLOC LIBOMP_HAVE_ALIGNED_ALLOC
#cmakedefine01 LIBOMP_HAVE_POSIX_MEMALIGN
#define KMP_HAVE_POSIX_MEMALIGN LIBOMP_HAVE_POSIX_MEMALIGN
#cmakedefine01 LIBOMP_HAVE__ALIGNED_MALLOC
#define KMP_HAVE__ALIGNED_MALLOC LIBOMP_HAVE__ALIGNED_MALLOC
// Configured cache line based on architecture
#if KMP_ARCH_PPC64

View File

@ -110,8 +110,8 @@ char const *__kmp_barrier_type_name[bs_last_barrier] = {"plain", "forkjoin"
"reduction"
#endif // KMP_FAST_REDUCTION_BARRIER
};
char const *__kmp_barrier_pattern_name[bp_last_bar] = {"linear", "tree",
"hyper", "hierarchical"};
char const *__kmp_barrier_pattern_name[bp_last_bar] = {
"linear", "tree", "hyper", "hierarchical", "dist"};
int __kmp_allThreadsSpecified = 0;
size_t __kmp_align_alloc = CACHE_LINE;

View File

@ -1025,6 +1025,30 @@ extern kmp_real64 __kmp_xchg_real64(volatile kmp_real64 *p, kmp_real64 v);
#define KMP_MB() /* nothing to do */
#endif
#if KMP_ARCH_X86 || KMP_ARCH_X86_64
#if KMP_COMPILER_ICC
#define KMP_MFENCE_() _mm_mfence()
#define KMP_SFENCE_() _mm_sfence()
#elif KMP_COMPILER_MSVC
#define KMP_MFENCE_() MemoryBarrier()
#define KMP_SFENCE_() MemoryBarrier()
#else
#define KMP_MFENCE_() __sync_synchronize()
#define KMP_SFENCE_() __sync_synchronize()
#endif
#define KMP_MFENCE() \
if (UNLIKELY(!__kmp_cpuinfo.initialized)) { \
__kmp_query_cpuid(&__kmp_cpuinfo); \
} \
if (__kmp_cpuinfo.sse2) { \
KMP_MFENCE_(); \
}
#define KMP_SFENCE() KMP_SFENCE_()
#else
#define KMP_MFENCE() KMP_MB()
#define KMP_SFENCE() KMP_MB()
#endif
#ifndef KMP_IMB
#define KMP_IMB() /* nothing to do */
#endif

View File

@ -107,6 +107,10 @@ static int __kmp_unregister_root_other_thread(int gtid);
static void __kmp_reap_thread(kmp_info_t *thread, int is_root);
kmp_info_t *__kmp_thread_pool_insert_pt = NULL;
void __kmp_resize_dist_barrier(kmp_team_t *team, int old_nthreads,
int new_nthreads);
void __kmp_add_threads_to_team(kmp_team_t *team, int new_nthreads);
/* Calculate the identifier of the current thread */
/* fast (and somewhat portable) way to get unique identifier of executing
thread. Returns KMP_GTID_DNE if we haven't been assigned a gtid. */
@ -1204,7 +1208,7 @@ void __kmp_serialized_parallel(ident_t *loc, kmp_int32 global_tid) {
this_thr->th.th_team = serial_team;
serial_team->t.t_master_tid = this_thr->th.th_info.ds.ds_tid;
KF_TRACE(10, ("__kmpc_serialized_parallel: T#d curtask=%p\n", global_tid,
KF_TRACE(10, ("__kmpc_serialized_parallel: T#%d curtask=%p\n", global_tid,
this_thr->th.th_current_task));
KMP_ASSERT(this_thr->th.th_current_task->td_flags.executing == 1);
this_thr->th.th_current_task->td_flags.executing = 0;
@ -1563,15 +1567,24 @@ int __kmp_fork_call(ident_t *loc, int gtid,
/* Change number of threads in the team if requested */
if (master_set_numthreads) { // The parallel has num_threads clause
if (master_set_numthreads < master_th->th.th_teams_size.nth) {
if (master_set_numthreads <= master_th->th.th_teams_size.nth) {
// AC: only can reduce number of threads dynamically, can't increase
kmp_info_t **other_threads = parent_team->t.t_threads;
// NOTE: if using distributed barrier, we need to run this code block
// even when the team size appears not to have changed from the max.
int old_proc = master_th->th.th_teams_size.nth;
if (__kmp_barrier_release_pattern[bs_forkjoin_barrier] ==
bp_dist_bar) {
__kmp_resize_dist_barrier(parent_team, old_proc,
master_set_numthreads);
__kmp_add_threads_to_team(parent_team, master_set_numthreads);
}
parent_team->t.t_nproc = master_set_numthreads;
for (i = 0; i < master_set_numthreads; ++i) {
other_threads[i]->th.th_team_nproc = master_set_numthreads;
}
// Keep extra threads hot in the team for possible next parallels
}
// Keep extra threads hot in the team for possible next parallels
master_th->th.th_set_nproc = 0;
}
@ -1635,6 +1648,9 @@ int __kmp_fork_call(ident_t *loc, int gtid,
}
#endif
// Need this to happen before we determine the number of threads, not while
// we are allocating the team
//__kmp_push_current_task_to_thread(master_th, parent_team, 0);
int enter_teams = 0;
if (parent_team->t.t_active_level >=
master_th->th.th_current_task->td_icvs.max_active_levels) {
@ -1642,13 +1658,10 @@ int __kmp_fork_call(ident_t *loc, int gtid,
} else {
enter_teams = ((ap == NULL && active_level == 0) ||
(ap && teams_level > 0 && teams_level == level));
nthreads =
master_set_numthreads
? master_set_numthreads
: get__nproc_2(
parent_team,
master_tid); // TODO: get nproc directly from current task
nthreads = master_set_numthreads
? master_set_numthreads
// TODO: get nproc directly from current task
: get__nproc_2(parent_team, master_tid);
// Check if we need to take forkjoin lock? (no need for serialized
// parallel out of teams construct). This code moved here from
// __kmp_reserve_threads() to speedup nested serialized parallels.
@ -1983,6 +1996,8 @@ int __kmp_fork_call(ident_t *loc, int gtid,
#endif
proc_bind, &new_icvs,
argc USE_NESTED_HOT_ARG(master_th));
if (__kmp_barrier_release_pattern[bs_forkjoin_barrier] == bp_dist_bar)
copy_icvs((kmp_internal_control_t *)team->t.b->team_icvs, &new_icvs);
} else {
/* allocate a new parallel team */
KF_TRACE(10, ("__kmp_fork_call: before __kmp_allocate_team\n"));
@ -1993,6 +2008,9 @@ int __kmp_fork_call(ident_t *loc, int gtid,
proc_bind,
&master_th->th.th_current_task->td_icvs,
argc USE_NESTED_HOT_ARG(master_th));
if (__kmp_barrier_release_pattern[bs_forkjoin_barrier] == bp_dist_bar)
copy_icvs((kmp_internal_control_t *)team->t.b->team_icvs,
&master_th->th.th_current_task->td_icvs);
}
KF_TRACE(
10, ("__kmp_fork_call: after __kmp_allocate_team - team = %p\n", team));
@ -2359,6 +2377,12 @@ void __kmp_join_call(ident_t *loc, int gtid
parent_team->t.t_stack_id = NULL;
}
#endif
if (team->t.t_nproc > 1 &&
__kmp_barrier_gather_pattern[bs_forkjoin_barrier] == bp_dist_bar) {
team->t.b->update_num_threads(team->t.t_nproc);
__kmp_add_threads_to_team(team, team->t.t_nproc);
}
}
KMP_MB();
@ -2646,6 +2670,9 @@ void __kmp_set_num_threads(int new_nth, int gtid) {
__kmp_acquire_bootstrap_lock(&__kmp_forkjoin_lock);
if (__kmp_barrier_release_pattern[bs_forkjoin_barrier] == bp_dist_bar) {
__kmp_resize_dist_barrier(hot_team, hot_team->t.t_nproc, new_nth);
}
// Release the extra threads we don't need any more.
for (f = new_nth; f < hot_team->t.t_nproc; f++) {
KMP_DEBUG_ASSERT(hot_team->t.t_threads[f] != NULL);
@ -2665,6 +2692,11 @@ void __kmp_set_num_threads(int new_nth, int gtid) {
}
#endif
if (__kmp_barrier_release_pattern[bs_forkjoin_barrier] == bp_dist_bar) {
hot_team->t.b->update_num_threads(new_nth);
__kmp_add_threads_to_team(hot_team, new_nth);
}
__kmp_release_bootstrap_lock(&__kmp_forkjoin_lock);
// Update the t_nproc field in the threads that are still active.
@ -4112,7 +4144,6 @@ static void __kmp_initialize_info(kmp_info_t *this_thr, kmp_team_t *team,
this_thr->th.th_team_nproc = team->t.t_nproc;
this_thr->th.th_team_master = master;
this_thr->th.th_team_serialized = team->t.t_serialized;
TCW_PTR(this_thr->th.th_sleep_loc, NULL);
KMP_DEBUG_ASSERT(team->t.t_implicit_task_taskdata);
@ -4281,6 +4312,12 @@ kmp_info_t *__kmp_allocate_thread(kmp_root_t *root, kmp_team_t *team,
new_thr->th.th_task_state_top = 0;
new_thr->th.th_task_state_stack_sz = 4;
if (__kmp_barrier_gather_pattern[bs_forkjoin_barrier] == bp_dist_bar) {
// Make sure pool thread has transitioned to waiting on own thread struct
KMP_DEBUG_ASSERT(new_thr->th.th_used_in_team.load() == 0);
// Thread activated in __kmp_allocate_team when increasing team size
}
#ifdef KMP_ADJUST_BLOCKTIME
/* Adjust blocktime back to zero if necessary */
/* Middle initialization might not have occurred yet */
@ -4448,6 +4485,9 @@ kmp_info_t *__kmp_allocate_thread(kmp_root_t *root, kmp_team_t *team,
balign[b].bb.use_oncore_barrier = 0;
}
TCW_PTR(new_thr->th.th_sleep_loc, NULL);
new_thr->th.th_sleep_loc_type = flag_unset;
new_thr->th.th_spin_here = FALSE;
new_thr->th.th_next_waiting = 0;
#if KMP_OS_UNIX
@ -5027,6 +5067,13 @@ __kmp_allocate_team(kmp_root_t *root, int new_nproc, int max_nproc,
}
#endif
if (team->t.t_nproc != new_nproc &&
__kmp_barrier_release_pattern[bs_forkjoin_barrier] == bp_dist_bar) {
// Distributed barrier may need a resize
int old_nthr = team->t.t_nproc;
__kmp_resize_dist_barrier(team, old_nthr, new_nproc);
}
// Has the number of threads changed?
/* Let's assume the most common case is that the number of threads is
unchanged, and put that case first. */
@ -5076,6 +5123,11 @@ __kmp_allocate_team(kmp_root_t *root, int new_nproc, int max_nproc,
new_nproc));
team->t.t_size_changed = 1;
if (__kmp_barrier_release_pattern[bs_forkjoin_barrier] == bp_dist_bar) {
// Barrier size already reduced earlier in this function
// Activate team threads via th_used_in_team
__kmp_add_threads_to_team(team, new_nproc);
}
#if KMP_NESTED_HOT_TEAMS
if (__kmp_hot_teams_mode == 0) {
// AC: saved number of threads should correspond to team's value in this
@ -5152,7 +5204,7 @@ __kmp_allocate_team(kmp_root_t *root, int new_nproc, int max_nproc,
KA_TRACE(20,
("__kmp_allocate_team: increasing hot team thread count to %d\n",
new_nproc));
int old_nproc = team->t.t_nproc; // save old value and use to update only
team->t.t_size_changed = 1;
#if KMP_NESTED_HOT_TEAMS
@ -5179,10 +5231,9 @@ __kmp_allocate_team(kmp_root_t *root, int new_nproc, int max_nproc,
KMP_DEBUG_ASSERT(__kmp_hot_teams_mode == 1);
team->t.t_nproc = new_nproc; // just get reserved threads involved
} else {
// we may have some threads in reserve, but not enough
team->t.t_nproc =
hot_teams[level]
.hot_team_nth; // get reserved threads involved if any
// We may have some threads in reserve, but not enough;
// get reserved threads involved if any.
team->t.t_nproc = hot_teams[level].hot_team_nth;
hot_teams[level].hot_team_nth = new_nproc; // adjust hot team max size
#endif // KMP_NESTED_HOT_TEAMS
if (team->t.t_max_nproc < new_nproc) {
@ -5237,8 +5288,12 @@ __kmp_allocate_team(kmp_root_t *root, int new_nproc, int max_nproc,
#if KMP_NESTED_HOT_TEAMS
} // end of check of t_nproc vs. new_nproc vs. hot_team_nth
#endif // KMP_NESTED_HOT_TEAMS
if (__kmp_barrier_release_pattern[bs_forkjoin_barrier] == bp_dist_bar) {
// Barrier size already increased earlier in this function
// Activate team threads via th_used_in_team
__kmp_add_threads_to_team(team, new_nproc);
}
/* make sure everyone is syncronized */
int old_nproc = team->t.t_nproc; // save old value and use to update only
// new threads below
__kmp_initialize_team(team, new_nproc, new_icvs,
root->r.r_uber_thread->th.th_ident);
@ -5342,6 +5397,13 @@ __kmp_allocate_team(kmp_root_t *root, int new_nproc, int max_nproc,
/* take this team from the team pool */
__kmp_team_pool = team->t.t_next_pool;
if (max_nproc > 1 &&
__kmp_barrier_gather_pattern[bs_forkjoin_barrier] == bp_dist_bar) {
if (!team->t.b) { // Allocate barrier structure
team->t.b = distributedBarrier::allocate(__kmp_dflt_team_nth_ub);
}
}
/* setup the team for fresh use */
__kmp_initialize_team(team, new_nproc, new_icvs, NULL);
@ -5397,6 +5459,12 @@ __kmp_allocate_team(kmp_root_t *root, int new_nproc, int max_nproc,
/* and set it up */
team->t.t_max_nproc = max_nproc;
if (max_nproc > 1 &&
__kmp_barrier_gather_pattern[bs_forkjoin_barrier] == bp_dist_bar) {
// Allocate barrier structure
team->t.b = distributedBarrier::allocate(__kmp_dflt_team_nth_ub);
}
/* NOTE well, for some reason allocating one big buffer and dividing it up
seems to really hurt performance a lot on the P4, so, let's not use this */
__kmp_allocate_team_arrays(team, max_nproc);
@ -5553,10 +5621,43 @@ void __kmp_free_team(kmp_root_t *root,
/* free the worker threads */
for (f = 1; f < team->t.t_nproc; ++f) {
KMP_DEBUG_ASSERT(team->t.t_threads[f]);
if (__kmp_barrier_gather_pattern[bs_forkjoin_barrier] == bp_dist_bar) {
KMP_COMPARE_AND_STORE_ACQ32(&(team->t.t_threads[f]->th.th_used_in_team),
1, 2);
}
__kmp_free_thread(team->t.t_threads[f]);
}
if (__kmp_barrier_gather_pattern[bs_forkjoin_barrier] == bp_dist_bar) {
if (team->t.b) {
// wake up thread at old location
team->t.b->go_release();
if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME) {
for (f = 1; f < team->t.t_nproc; ++f) {
if (team->t.b->sleep[f].sleep) {
__kmp_atomic_resume_64(
team->t.t_threads[f]->th.th_info.ds.ds_gtid,
(kmp_atomic_flag_64<> *)NULL);
}
}
}
// Wait for threads to be removed from team
for (int f = 1; f < team->t.t_nproc; ++f) {
while (team->t.t_threads[f]->th.th_used_in_team.load() != 0)
KMP_CPU_PAUSE();
}
}
}
for (f = 1; f < team->t.t_nproc; ++f) {
team->t.t_threads[f] = NULL;
}
if (team->t.t_max_nproc > 1 &&
__kmp_barrier_gather_pattern[bs_forkjoin_barrier] == bp_dist_bar) {
distributedBarrier::deallocate(team->t.b);
team->t.b = NULL;
}
/* put the team back in the team pool */
/* TODO limit size of team pool, call reap_team if pool too large */
team->t.t_next_pool = CCAST(kmp_team_t *, __kmp_team_pool);
@ -5955,11 +6056,18 @@ static void __kmp_reap_thread(kmp_info_t *thread, int is_root) {
KA_TRACE(
20, ("__kmp_reap_thread: releasing T#%d from fork barrier for reap\n",
gtid));
/* Need release fence here to prevent seg faults for tree forkjoin barrier
* (GEH) */
kmp_flag_64<> flag(&thread->th.th_bar[bs_forkjoin_barrier].bb.b_go,
thread);
__kmp_release_64(&flag);
if (__kmp_barrier_gather_pattern[bs_forkjoin_barrier] == bp_dist_bar) {
while (
!KMP_COMPARE_AND_STORE_ACQ32(&(thread->th.th_used_in_team), 0, 3))
KMP_CPU_PAUSE();
__kmp_resume_32(gtid, (kmp_flag_32<false, false> *)NULL);
} else {
/* Need release fence here to prevent seg faults for tree forkjoin
barrier (GEH) */
kmp_flag_64<> flag(&thread->th.th_bar[bs_forkjoin_barrier].bb.b_go,
thread);
__kmp_release_64(&flag);
}
}
// Terminate OS thread.
@ -6844,8 +6952,8 @@ static void __kmp_do_serial_initialize(void) {
#if KMP_FAST_REDUCTION_BARRIER
#define kmp_reduction_barrier_gather_bb ((int)1)
#define kmp_reduction_barrier_release_bb ((int)1)
#define kmp_reduction_barrier_gather_pat bp_hyper_bar
#define kmp_reduction_barrier_release_pat bp_hyper_bar
#define kmp_reduction_barrier_gather_pat __kmp_barrier_gather_pat_dflt
#define kmp_reduction_barrier_release_pat __kmp_barrier_release_pat_dflt
#endif // KMP_FAST_REDUCTION_BARRIER
for (i = bs_plain_barrier; i < bs_last_barrier; i++) {
__kmp_barrier_gather_branch_bits[i] = __kmp_barrier_gather_bb_dflt;
@ -8702,6 +8810,96 @@ void __kmp_omp_display_env(int verbose) {
__kmp_release_bootstrap_lock(&__kmp_initz_lock);
}
// The team size is changing, so distributed barrier must be modified
void __kmp_resize_dist_barrier(kmp_team_t *team, int old_nthreads,
int new_nthreads) {
KMP_DEBUG_ASSERT(__kmp_barrier_release_pattern[bs_forkjoin_barrier] ==
bp_dist_bar);
kmp_info_t **other_threads = team->t.t_threads;
// We want all the workers to stop waiting on the barrier while we adjust the
// size of the team.
for (int f = 1; f < old_nthreads; ++f) {
KMP_DEBUG_ASSERT(other_threads[f] != NULL);
// Ignore threads that are already inactive or not present in the team
if (team->t.t_threads[f]->th.th_used_in_team.load() == 0) {
// teams construct causes thread_limit to get passed in, and some of
// those could be inactive; just ignore them
continue;
}
// If thread is transitioning still to in_use state, wait for it
if (team->t.t_threads[f]->th.th_used_in_team.load() == 3) {
while (team->t.t_threads[f]->th.th_used_in_team.load() == 3)
KMP_CPU_PAUSE();
}
// The thread should be in_use now
KMP_DEBUG_ASSERT(team->t.t_threads[f]->th.th_used_in_team.load() == 1);
// Transition to unused state
team->t.t_threads[f]->th.th_used_in_team.store(2);
KMP_DEBUG_ASSERT(team->t.t_threads[f]->th.th_used_in_team.load() == 2);
}
// Release all the workers
kmp_uint64 new_value; // new value for go
new_value = team->t.b->go_release();
KMP_MFENCE();
// Workers should see transition status 2 and move to 0; but may need to be
// woken up first
size_t my_go_index;
int count = old_nthreads - 1;
while (count > 0) {
count = old_nthreads - 1;
for (int f = 1; f < old_nthreads; ++f) {
my_go_index = f / team->t.b->threads_per_go;
if (other_threads[f]->th.th_used_in_team.load() != 0) {
if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME) { // Wake up the workers
kmp_atomic_flag_64<> *flag = (kmp_atomic_flag_64<> *)CCAST(
void *, other_threads[f]->th.th_sleep_loc);
__kmp_atomic_resume_64(other_threads[f]->th.th_info.ds.ds_gtid, flag);
}
} else {
KMP_DEBUG_ASSERT(team->t.t_threads[f]->th.th_used_in_team.load() == 0);
count--;
}
}
}
// Now update the barrier size
team->t.b->update_num_threads(new_nthreads);
team->t.b->go_reset();
}
void __kmp_add_threads_to_team(kmp_team_t *team, int new_nthreads) {
// Add the threads back to the team
KMP_DEBUG_ASSERT(team);
// Threads were paused and pointed at th_used_in_team temporarily during a
// resize of the team. We're going to set th_used_in_team to 3 to indicate to
// the thread that it should transition itself back into the team. Then, if
// blocktime isn't infinite, the thread could be sleeping, so we send a resume
// to wake it up.
for (int f = 1; f < new_nthreads; ++f) {
KMP_DEBUG_ASSERT(team->t.t_threads[f]);
KMP_COMPARE_AND_STORE_ACQ32(&(team->t.t_threads[f]->th.th_used_in_team), 0,
3);
if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME) { // Wake up sleeping threads
__kmp_resume_32(team->t.t_threads[f]->th.th_info.ds.ds_gtid,
(kmp_flag_32<false, false> *)NULL);
}
}
// The threads should be transitioning to the team; when they are done, they
// should have set th_used_in_team to 1. This loop forces master to wait until
// all threads have moved into the team and are waiting in the barrier.
int count = new_nthreads - 1;
while (count > 0) {
count = new_nthreads - 1;
for (int f = 1; f < new_nthreads; ++f) {
if (team->t.t_threads[f]->th.th_used_in_team.load() == 1) {
count--;
}
}
}
}
// Globals and functions for hidden helper task
kmp_info_t **__kmp_hidden_helper_threads;
kmp_info_t *__kmp_hidden_helper_main_thread;

View File

@ -1684,6 +1684,8 @@ static void __kmp_stg_parse_barrier_pattern(char const *name, char const *value,
const char *var;
/* ---------- Barrier method control ------------ */
static int dist_req = 0, non_dist_req = 0;
static bool warn = 1;
for (int i = bs_plain_barrier; i < bs_last_barrier; i++) {
var = __kmp_barrier_pattern_env_name[i];
@ -1695,6 +1697,11 @@ static void __kmp_stg_parse_barrier_pattern(char const *name, char const *value,
for (j = bp_linear_bar; j < bp_last_bar; j++) {
if (__kmp_match_with_sentinel(__kmp_barrier_pattern_name[j], value, 1,
',')) {
if (j == bp_dist_bar) {
dist_req++;
} else {
non_dist_req++;
}
__kmp_barrier_gather_pattern[i] = (kmp_bar_pat_e)j;
break;
}
@ -1709,6 +1716,11 @@ static void __kmp_stg_parse_barrier_pattern(char const *name, char const *value,
if (comma != NULL) {
for (j = bp_linear_bar; j < bp_last_bar; j++) {
if (__kmp_str_match(__kmp_barrier_pattern_name[j], 1, comma + 1)) {
if (j == bp_dist_bar) {
dist_req++;
} else {
non_dist_req++;
}
__kmp_barrier_release_pattern[i] = (kmp_bar_pat_e)j;
break;
}
@ -1723,6 +1735,28 @@ static void __kmp_stg_parse_barrier_pattern(char const *name, char const *value,
}
}
}
if ((dist_req == 0) && (non_dist_req != 0)) {
// Something was set to a barrier other than dist; set all others to hyper
for (int i = bs_plain_barrier; i < bs_last_barrier; i++) {
if (__kmp_barrier_release_pattern[i] == bp_dist_bar)
__kmp_barrier_release_pattern[i] = bp_hyper_bar;
if (__kmp_barrier_gather_pattern[i] == bp_dist_bar)
__kmp_barrier_gather_pattern[i] = bp_hyper_bar;
}
} else if (non_dist_req != 0) {
// some requests for dist, plus requests for others; set all to dist
if (non_dist_req > 0 && dist_req > 0 && warn) {
KMP_INFORM(BarrierPatternOverride, name,
__kmp_barrier_pattern_name[bp_dist_bar]);
warn = 0;
}
for (int i = bs_plain_barrier; i < bs_last_barrier; i++) {
if (__kmp_barrier_release_pattern[i] != bp_dist_bar)
__kmp_barrier_release_pattern[i] = bp_dist_bar;
if (__kmp_barrier_gather_pattern[i] != bp_dist_bar)
__kmp_barrier_gather_pattern[i] = bp_dist_bar;
}
}
} // __kmp_stg_parse_barrier_pattern
static void __kmp_stg_print_barrier_pattern(kmp_str_buf_t *buffer,
@ -1739,7 +1773,7 @@ static void __kmp_stg_print_barrier_pattern(kmp_str_buf_t *buffer,
__kmp_str_buf_print(buffer, " %s='",
__kmp_barrier_pattern_env_name[i]);
}
KMP_DEBUG_ASSERT(j < bs_last_barrier && k < bs_last_barrier);
KMP_DEBUG_ASSERT(j < bp_last_bar && k < bp_last_bar);
__kmp_str_buf_print(buffer, "%s,%s'\n", __kmp_barrier_pattern_name[j],
__kmp_barrier_pattern_name[k]);
}

View File

@ -246,6 +246,8 @@ enum stats_state_e {
// KMP_tree_release -- time in __kmp_tree_barrier_release
// KMP_hyper_gather -- time in __kmp_hyper_barrier_gather
// KMP_hyper_release -- time in __kmp_hyper_barrier_release
// KMP_dist_gather -- time in __kmp_dist_barrier_gather
// KMP_dist_release -- time in __kmp_dist_barrier_release
// clang-format off
#define KMP_FOREACH_DEVELOPER_TIMER(macro, arg) \
macro(KMP_fork_call, 0, arg) \
@ -255,6 +257,8 @@ enum stats_state_e {
macro(KMP_hier_release, 0, arg) \
macro(KMP_hyper_gather, 0, arg) \
macro(KMP_hyper_release, 0, arg) \
macro(KMP_dist_gather, 0, arg) \
macro(KMP_dist_release, 0, arg) \
macro(KMP_linear_gather, 0, arg) \
macro(KMP_linear_release, 0, arg) \
macro(KMP_tree_gather, 0, arg) \

View File

@ -515,6 +515,31 @@ int __kmp_str_match(char const *target, int len, char const *data) {
return ((len > 0) ? i >= len : (!target[i] && (len || !data[i])));
} // __kmp_str_match
// If data contains all of target, returns true, otherwise returns false.
// len should be the length of target
bool __kmp_str_contains(char const *target, int len, char const *data) {
int i = 0, j = 0, start = 0;
if (target == NULL || data == NULL) {
return FALSE;
}
while (target[i]) {
if (!data[j])
return FALSE;
if (TOLOWER(target[i]) != TOLOWER(data[j])) {
j = start + 1;
start = j;
i = 0;
} else {
if (i == 0)
start = j;
j++;
i++;
}
}
return i == len;
} // __kmp_str_contains
int __kmp_str_match_false(char const *data) {
int result =
__kmp_str_match("false", 1, data) || __kmp_str_match("off", 2, data) ||

View File

@ -106,6 +106,7 @@ int __kmp_str_eqf(char const *lhs, char const *rhs);
char *__kmp_str_format(char const *format, ...);
void __kmp_str_free(char **str);
int __kmp_str_match(char const *target, int len, char const *data);
bool __kmp_str_contains(char const *target, int len, char const *data);
int __kmp_str_match_false(char const *data);
int __kmp_str_match_true(char const *data);
void __kmp_str_replace(char *str, char search_for, char replace_with);

View File

@ -2947,8 +2947,7 @@ static inline int __kmp_execute_tasks_template(
(TCR_PTR(CCAST(void *, other_thread->th.th_sleep_loc)) !=
NULL)) {
asleep = 1;
__kmp_null_resume_wrapper(__kmp_gtid_from_thread(other_thread),
other_thread->th.th_sleep_loc);
__kmp_null_resume_wrapper(other_thread);
// A sleeping thread should not have any tasks on it's queue.
// There is a slight possibility that it resumes, steals a task
// from another thread, which spawns more tasks, all in the time
@ -3097,6 +3096,16 @@ int __kmp_execute_tasks_64(
thread_finished USE_ITT_BUILD_ARG(itt_sync_obj), is_constrained);
}
template <bool C, bool S>
int __kmp_atomic_execute_tasks_64(
kmp_info_t *thread, kmp_int32 gtid, kmp_atomic_flag_64<C, S> *flag,
int final_spin, int *thread_finished USE_ITT_BUILD_ARG(void *itt_sync_obj),
kmp_int32 is_constrained) {
return __kmp_execute_tasks_template(
thread, gtid, flag, final_spin,
thread_finished USE_ITT_BUILD_ARG(itt_sync_obj), is_constrained);
}
int __kmp_execute_tasks_oncore(
kmp_info_t *thread, kmp_int32 gtid, kmp_flag_oncore *flag, int final_spin,
int *thread_finished USE_ITT_BUILD_ARG(void *itt_sync_obj),
@ -3123,6 +3132,14 @@ template int __kmp_execute_tasks_64<true, false>(kmp_info_t *, kmp_int32,
int *USE_ITT_BUILD_ARG(void *),
kmp_int32);
template int __kmp_atomic_execute_tasks_64<false, true>(
kmp_info_t *, kmp_int32, kmp_atomic_flag_64<false, true> *, int,
int *USE_ITT_BUILD_ARG(void *), kmp_int32);
template int __kmp_atomic_execute_tasks_64<true, false>(
kmp_info_t *, kmp_int32, kmp_atomic_flag_64<true, false> *, int,
int *USE_ITT_BUILD_ARG(void *), kmp_int32);
// __kmp_enable_tasking: Allocate task team and resume threads sleeping at the
// next barrier so they can assist in executing enqueued tasks.
// First thread in allocates the task team atomically.
@ -3161,7 +3178,7 @@ static void __kmp_enable_tasking(kmp_task_team_t *task_team,
// tasks and execute them. In extra barrier mode, tasks do not sleep
// at the separate tasking barrier, so this isn't a problem.
for (i = 0; i < nthreads; i++) {
volatile void *sleep_loc;
void *sleep_loc;
kmp_info_t *thread = threads_data[i].td.td_thr;
if (i == this_thr->th.th_info.ds.ds_tid) {
@ -3178,7 +3195,7 @@ static void __kmp_enable_tasking(kmp_task_team_t *task_team,
KF_TRACE(50, ("__kmp_enable_tasking: T#%d waking up thread T#%d\n",
__kmp_gtid_from_thread(this_thr),
__kmp_gtid_from_thread(thread)));
__kmp_null_resume_wrapper(__kmp_gtid_from_thread(thread), sleep_loc);
__kmp_null_resume_wrapper(thread);
} else {
KF_TRACE(50, ("__kmp_enable_tasking: T#%d don't wake up thread T#%d\n",
__kmp_gtid_from_thread(this_thr),
@ -3546,7 +3563,7 @@ void __kmp_wait_to_unref_task_teams(void) {
__kmp_gtid_from_thread(thread)));
if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME) {
volatile void *sleep_loc;
void *sleep_loc;
// If the thread is sleeping, awaken it.
if ((sleep_loc = TCR_PTR(CCAST(void *, thread->th.th_sleep_loc))) !=
NULL) {
@ -3554,7 +3571,7 @@ void __kmp_wait_to_unref_task_teams(void) {
10,
("__kmp_wait_to_unref_task_team: T#%d waking up thread T#%d\n",
__kmp_gtid_from_thread(thread), __kmp_gtid_from_thread(thread)));
__kmp_null_resume_wrapper(__kmp_gtid_from_thread(thread), sleep_loc);
__kmp_null_resume_wrapper(thread);
}
}
}

View File

@ -33,6 +33,10 @@ template <bool C, bool S>
void __kmp_mwait_64(int th_gtid, kmp_flag_64<C, S> *flag) {
__kmp_mwait_template(th_gtid, flag);
}
template <bool C, bool S>
void __kmp_atomic_mwait_64(int th_gtid, kmp_atomic_flag_64<C, S> *flag) {
__kmp_mwait_template(th_gtid, flag);
}
void __kmp_mwait_oncore(int th_gtid, kmp_flag_oncore *flag) {
__kmp_mwait_template(th_gtid, flag);
}
@ -40,4 +44,8 @@ void __kmp_mwait_oncore(int th_gtid, kmp_flag_oncore *flag) {
template void __kmp_mwait_32<false, false>(int, kmp_flag_32<false, false> *);
template void __kmp_mwait_64<false, true>(int, kmp_flag_64<false, true> *);
template void __kmp_mwait_64<true, false>(int, kmp_flag_64<true, false> *);
template void
__kmp_atomic_mwait_64<false, true>(int, kmp_atomic_flag_64<false, true> *);
template void
__kmp_atomic_mwait_64<true, false>(int, kmp_atomic_flag_64<true, false> *);
#endif

View File

@ -33,96 +33,288 @@ higher level operations such as barriers and fork/join.
@{
*/
/*!
* The flag_type describes the storage used for the flag.
*/
enum flag_type {
flag32, /**< 32 bit flags */
flag64, /**< 64 bit flags */
flag_oncore /**< special 64-bit flag for on-core barrier (hierarchical) */
};
struct flag_properties {
unsigned int type : 16;
unsigned int reserved : 16;
};
/*!
* Base class for wait/release volatile flag
*/
template <typename P> class kmp_flag_native {
volatile P *loc;
flag_properties t;
template <enum flag_type FlagType> struct flag_traits {};
public:
typedef P flag_t;
kmp_flag_native(volatile P *p, flag_type ft)
: loc(p), t({(short unsigned int)ft, 0U}) {}
volatile P *get() { return loc; }
void *get_void_p() { return RCAST(void *, CCAST(P *, loc)); }
void set(volatile P *new_loc) { loc = new_loc; }
flag_type get_type() { return (flag_type)(t.type); }
P load() { return *loc; }
void store(P val) { *loc = val; }
template <> struct flag_traits<flag32> {
typedef kmp_uint32 flag_t;
static const flag_type t = flag32;
static inline flag_t tcr(flag_t f) { return TCR_4(f); }
static inline flag_t test_then_add4(volatile flag_t *f) {
return KMP_TEST_THEN_ADD4_32(RCAST(volatile kmp_int32 *, f));
}
static inline flag_t test_then_or(volatile flag_t *f, flag_t v) {
return KMP_TEST_THEN_OR32(f, v);
}
static inline flag_t test_then_and(volatile flag_t *f, flag_t v) {
return KMP_TEST_THEN_AND32(f, v);
}
};
/*!
* Base class for wait/release atomic flag
*/
template <typename P> class kmp_flag {
std::atomic<P>
*loc; /**< Pointer to the flag storage that is modified by another thread
*/
template <> struct flag_traits<atomic_flag64> {
typedef kmp_uint64 flag_t;
static const flag_type t = atomic_flag64;
static inline flag_t tcr(flag_t f) { return TCR_8(f); }
static inline flag_t test_then_add4(volatile flag_t *f) {
return KMP_TEST_THEN_ADD4_64(RCAST(volatile kmp_int64 *, f));
}
static inline flag_t test_then_or(volatile flag_t *f, flag_t v) {
return KMP_TEST_THEN_OR64(f, v);
}
static inline flag_t test_then_and(volatile flag_t *f, flag_t v) {
return KMP_TEST_THEN_AND64(f, v);
}
};
template <> struct flag_traits<flag64> {
typedef kmp_uint64 flag_t;
static const flag_type t = flag64;
static inline flag_t tcr(flag_t f) { return TCR_8(f); }
static inline flag_t test_then_add4(volatile flag_t *f) {
return KMP_TEST_THEN_ADD4_64(RCAST(volatile kmp_int64 *, f));
}
static inline flag_t test_then_or(volatile flag_t *f, flag_t v) {
return KMP_TEST_THEN_OR64(f, v);
}
static inline flag_t test_then_and(volatile flag_t *f, flag_t v) {
return KMP_TEST_THEN_AND64(f, v);
}
};
template <> struct flag_traits<flag_oncore> {
typedef kmp_uint64 flag_t;
static const flag_type t = flag_oncore;
static inline flag_t tcr(flag_t f) { return TCR_8(f); }
static inline flag_t test_then_add4(volatile flag_t *f) {
return KMP_TEST_THEN_ADD4_64(RCAST(volatile kmp_int64 *, f));
}
static inline flag_t test_then_or(volatile flag_t *f, flag_t v) {
return KMP_TEST_THEN_OR64(f, v);
}
static inline flag_t test_then_and(volatile flag_t *f, flag_t v) {
return KMP_TEST_THEN_AND64(f, v);
}
};
/*! Base class for all flags */
template <flag_type FlagType> class kmp_flag {
protected:
flag_properties t; /**< "Type" of the flag in loc */
kmp_info_t *waiting_threads[1]; /**< Threads sleeping on this thread. */
kmp_uint32 num_waiting_threads; /**< #threads sleeping on this thread. */
std::atomic<bool> *sleepLoc;
public:
typedef P flag_t;
kmp_flag(std::atomic<P> *p, flag_type ft)
: loc(p), t({(short unsigned int)ft, 0U}) {}
/*!
* @result the pointer to the actual flag
*/
std::atomic<P> *get() { return loc; }
/*!
* @result void* pointer to the actual flag
*/
void *get_void_p() { return RCAST(void *, loc); }
/*!
* @param new_loc in set loc to point at new_loc
*/
void set(std::atomic<P> *new_loc) { loc = new_loc; }
/*!
* @result the flag_type
*/
typedef flag_traits<FlagType> traits_type;
kmp_flag() : t({FlagType, 0U}), num_waiting_threads(0), sleepLoc(nullptr) {}
kmp_flag(int nwaiters)
: t({FlagType, 0U}), num_waiting_threads(nwaiters), sleepLoc(nullptr) {}
kmp_flag(std::atomic<bool> *sloc)
: t({FlagType, 0U}), num_waiting_threads(0), sleepLoc(sloc) {}
/*! @result the flag_type */
flag_type get_type() { return (flag_type)(t.type); }
/*!
* @result flag value
*/
P load() { return loc->load(std::memory_order_acquire); }
/*!
* @param val the new flag value to be stored
*/
void store(P val) { loc->store(val, std::memory_order_release); }
// Derived classes must provide the following:
/*
kmp_info_t * get_waiter(kmp_uint32 i);
kmp_uint32 get_num_waiters();
bool done_check();
bool done_check_val(P old_loc);
bool notdone_check();
P internal_release();
void suspend(int th_gtid);
void mwait(int th_gtid);
void resume(int th_gtid);
P set_sleeping();
P unset_sleeping();
bool is_sleeping();
bool is_any_sleeping();
bool is_sleeping_val(P old_loc);
int execute_tasks(kmp_info_t *this_thr, kmp_int32 gtid, int final_spin,
int *thread_finished
USE_ITT_BUILD_ARG(void * itt_sync_obj), kmp_int32
is_constrained);
*/
/*! param i in index into waiting_threads
* @result the thread that is waiting at index i */
kmp_info_t *get_waiter(kmp_uint32 i) {
KMP_DEBUG_ASSERT(i < num_waiting_threads);
return waiting_threads[i];
}
/*! @result num_waiting_threads */
kmp_uint32 get_num_waiters() { return num_waiting_threads; }
/*! @param thr in the thread which is now waiting
* Insert a waiting thread at index 0. */
void set_waiter(kmp_info_t *thr) {
waiting_threads[0] = thr;
num_waiting_threads = 1;
}
enum barrier_type get_bt() { return bs_last_barrier; }
};
/*! Base class for wait/release volatile flag */
template <typename PtrType, flag_type FlagType, bool Sleepable>
class kmp_flag_native : public kmp_flag<FlagType> {
protected:
volatile PtrType *loc;
PtrType checker; /**< When flag==checker, it has been released. */
typedef flag_traits<FlagType> traits_type;
public:
typedef PtrType flag_t;
kmp_flag_native(volatile PtrType *p) : kmp_flag<FlagType>(), loc(p) {}
kmp_flag_native(volatile PtrType *p, kmp_info_t *thr)
: kmp_flag<FlagType>(1), loc(p) {
this->waiting_threads[0] = thr;
}
kmp_flag_native(volatile PtrType *p, PtrType c)
: kmp_flag<FlagType>(), loc(p), checker(c) {}
kmp_flag_native(volatile PtrType *p, PtrType c, std::atomic<bool> *sloc)
: kmp_flag<FlagType>(sloc), loc(p), checker(c) {}
virtual ~kmp_flag_native() {}
void *operator new(size_t size) { return __kmp_allocate(size); }
void operator delete(void *p) { __kmp_free(p); }
volatile PtrType *get() { return loc; }
void *get_void_p() { return RCAST(void *, CCAST(PtrType *, loc)); }
void set(volatile PtrType *new_loc) { loc = new_loc; }
PtrType load() { return *loc; }
void store(PtrType val) { *loc = val; }
/*! @result true if the flag object has been released. */
virtual bool done_check() {
if (Sleepable && !(this->sleepLoc))
return (traits_type::tcr(*(this->get())) & ~KMP_BARRIER_SLEEP_STATE) ==
checker;
else
return traits_type::tcr(*(this->get())) == checker;
}
/*! @param old_loc in old value of flag
* @result true if the flag's old value indicates it was released. */
virtual bool done_check_val(PtrType old_loc) { return old_loc == checker; }
/*! @result true if the flag object is not yet released.
* Used in __kmp_wait_template like:
* @code
* while (flag.notdone_check()) { pause(); }
* @endcode */
virtual bool notdone_check() {
return traits_type::tcr(*(this->get())) != checker;
}
/*! @result Actual flag value before release was applied.
* Trigger all waiting threads to run by modifying flag to release state. */
void internal_release() {
(void)traits_type::test_then_add4((volatile PtrType *)this->get());
}
/*! @result Actual flag value before sleep bit(s) set.
* Notes that there is at least one thread sleeping on the flag by setting
* sleep bit(s). */
PtrType set_sleeping() {
if (this->sleepLoc) {
this->sleepLoc->store(true);
return *(this->get());
}
return traits_type::test_then_or((volatile PtrType *)this->get(),
KMP_BARRIER_SLEEP_STATE);
}
/*! @result Actual flag value before sleep bit(s) cleared.
* Notes that there are no longer threads sleeping on the flag by clearing
* sleep bit(s). */
void unset_sleeping() {
if (this->sleepLoc) {
this->sleepLoc->store(false);
return;
}
traits_type::test_then_and((volatile PtrType *)this->get(),
~KMP_BARRIER_SLEEP_STATE);
}
/*! @param old_loc in old value of flag
* Test if there are threads sleeping on the flag's old value in old_loc. */
bool is_sleeping_val(PtrType old_loc) {
if (this->sleepLoc)
return this->sleepLoc->load();
return old_loc & KMP_BARRIER_SLEEP_STATE;
}
/*! Test whether there are threads sleeping on the flag. */
bool is_sleeping() {
if (this->sleepLoc)
return this->sleepLoc->load();
return is_sleeping_val(*(this->get()));
}
bool is_any_sleeping() {
if (this->sleepLoc)
return this->sleepLoc->load();
return is_sleeping_val(*(this->get()));
}
kmp_uint8 *get_stolen() { return NULL; }
};
/*! Base class for wait/release atomic flag */
template <typename PtrType, flag_type FlagType, bool Sleepable>
class kmp_flag_atomic : public kmp_flag<FlagType> {
protected:
std::atomic<PtrType> *loc; /**< Pointer to flag location to wait on */
PtrType checker; /**< Flag == checker means it has been released. */
public:
typedef flag_traits<FlagType> traits_type;
typedef PtrType flag_t;
kmp_flag_atomic(std::atomic<PtrType> *p) : kmp_flag<FlagType>(), loc(p) {}
kmp_flag_atomic(std::atomic<PtrType> *p, kmp_info_t *thr)
: kmp_flag<FlagType>(1), loc(p) {
this->waiting_threads[0] = thr;
}
kmp_flag_atomic(std::atomic<PtrType> *p, PtrType c)
: kmp_flag<FlagType>(), loc(p), checker(c) {}
kmp_flag_atomic(std::atomic<PtrType> *p, PtrType c, std::atomic<bool> *sloc)
: kmp_flag<FlagType>(sloc), loc(p), checker(c) {}
/*! @result the pointer to the actual flag */
std::atomic<PtrType> *get() { return loc; }
/*! @result void* pointer to the actual flag */
void *get_void_p() { return RCAST(void *, loc); }
/*! @param new_loc in set loc to point at new_loc */
void set(std::atomic<PtrType> *new_loc) { loc = new_loc; }
/*! @result flag value */
PtrType load() { return loc->load(std::memory_order_acquire); }
/*! @param val the new flag value to be stored */
void store(PtrType val) { loc->store(val, std::memory_order_release); }
/*! @result true if the flag object has been released. */
bool done_check() {
if (Sleepable && !(this->sleepLoc))
return (this->load() & ~KMP_BARRIER_SLEEP_STATE) == checker;
else
return this->load() == checker;
}
/*! @param old_loc in old value of flag
* @result true if the flag's old value indicates it was released. */
bool done_check_val(PtrType old_loc) { return old_loc == checker; }
/*! @result true if the flag object is not yet released.
* Used in __kmp_wait_template like:
* @code
* while (flag.notdone_check()) { pause(); }
* @endcode */
bool notdone_check() { return this->load() != checker; }
/*! @result Actual flag value before release was applied.
* Trigger all waiting threads to run by modifying flag to release state. */
void internal_release() { KMP_ATOMIC_ADD(this->get(), 4); }
/*! @result Actual flag value before sleep bit(s) set.
* Notes that there is at least one thread sleeping on the flag by setting
* sleep bit(s). */
PtrType set_sleeping() {
if (this->sleepLoc) {
this->sleepLoc->store(true);
return *(this->get());
}
return KMP_ATOMIC_OR(this->get(), KMP_BARRIER_SLEEP_STATE);
}
/*! @result Actual flag value before sleep bit(s) cleared.
* Notes that there are no longer threads sleeping on the flag by clearing
* sleep bit(s). */
void unset_sleeping() {
if (this->sleepLoc) {
this->sleepLoc->store(false);
return;
}
KMP_ATOMIC_AND(this->get(), ~KMP_BARRIER_SLEEP_STATE);
}
/*! @param old_loc in old value of flag
* Test whether there are threads sleeping on flag's old value in old_loc. */
bool is_sleeping_val(PtrType old_loc) {
if (this->sleepLoc)
return this->sleepLoc->load();
return old_loc & KMP_BARRIER_SLEEP_STATE;
}
/*! Test whether there are threads sleeping on the flag. */
bool is_sleeping() {
if (this->sleepLoc)
return this->sleepLoc->load();
return is_sleeping_val(this->load());
}
bool is_any_sleeping() {
if (this->sleepLoc)
return this->sleepLoc->load();
return is_sleeping_val(this->load());
}
kmp_uint8 *get_stolen() { return NULL; }
};
#if OMPT_SUPPORT
@ -264,8 +456,9 @@ final_spin=FALSE)
ompt_entry_state = this_thr->th.ompt_thread_info.state;
if (!final_spin || ompt_entry_state != ompt_state_wait_barrier_implicit ||
KMP_MASTER_TID(this_thr->th.th_info.ds.ds_tid)) {
ompt_lw_taskteam_t *team =
this_thr->th.th_team->t.ompt_serialized_team_info;
ompt_lw_taskteam_t *team = NULL;
if (this_thr->th.th_team)
team = this_thr->th.th_team->t.ompt_serialized_team_info;
if (team) {
tId = &(team->ompt_task_info.task_data);
} else {
@ -340,11 +533,11 @@ final_spin=FALSE)
disabled (KMP_TASKING=0). */
if (task_team != NULL) {
if (TCR_SYNC_4(task_team->tt.tt_active)) {
if (KMP_TASKING_ENABLED(task_team))
if (KMP_TASKING_ENABLED(task_team)) {
flag->execute_tasks(
this_thr, th_gtid, final_spin,
&tasks_completed USE_ITT_BUILD_ARG(itt_sync_obj), 0);
else
} else
this_thr->th.th_reap_state = KMP_SAFE_TO_REAP;
} else {
KMP_DEBUG_ASSERT(!KMP_MASTER_TID(this_thr->th.th_info.ds.ds_tid));
@ -557,6 +750,7 @@ static inline void __kmp_mwait_template(int th_gtid, C *flag) {
else {
// if flag changes here, wake-up happens immediately
TCW_PTR(th->th.th_sleep_loc, (void *)flag);
th->th.th_sleep_loc_type = flag->get_type();
__kmp_unlock_suspend_mx(th);
KF_TRACE(50, ("__kmp_mwait_template: T#%d calling mwait\n", th_gtid));
#if KMP_HAVE_UMWAIT
@ -574,6 +768,7 @@ static inline void __kmp_mwait_template(int th_gtid, C *flag) {
if (flag->is_sleeping())
flag->unset_sleeping();
TCW_PTR(th->th.th_sleep_loc, NULL);
th->th.th_sleep_loc_type = flag_unset;
}
// Mark thread as active again
th->th.th_active = TRUE;
@ -624,251 +819,15 @@ template <class C> static inline void __kmp_release_template(C *flag) {
}
}
template <typename FlagType> struct flag_traits {};
template <> struct flag_traits<kmp_uint32> {
typedef kmp_uint32 flag_t;
static const flag_type t = flag32;
static inline flag_t tcr(flag_t f) { return TCR_4(f); }
static inline flag_t test_then_add4(volatile flag_t *f) {
return KMP_TEST_THEN_ADD4_32(RCAST(volatile kmp_int32 *, f));
}
static inline flag_t test_then_or(volatile flag_t *f, flag_t v) {
return KMP_TEST_THEN_OR32(f, v);
}
static inline flag_t test_then_and(volatile flag_t *f, flag_t v) {
return KMP_TEST_THEN_AND32(f, v);
}
};
template <> struct flag_traits<kmp_uint64> {
typedef kmp_uint64 flag_t;
static const flag_type t = flag64;
static inline flag_t tcr(flag_t f) { return TCR_8(f); }
static inline flag_t test_then_add4(volatile flag_t *f) {
return KMP_TEST_THEN_ADD4_64(RCAST(volatile kmp_int64 *, f));
}
static inline flag_t test_then_or(volatile flag_t *f, flag_t v) {
return KMP_TEST_THEN_OR64(f, v);
}
static inline flag_t test_then_and(volatile flag_t *f, flag_t v) {
return KMP_TEST_THEN_AND64(f, v);
}
};
// Basic flag that does not use C11 Atomics
template <typename FlagType, bool Sleepable>
class kmp_basic_flag_native : public kmp_flag_native<FlagType> {
typedef flag_traits<FlagType> traits_type;
FlagType checker; /**< Value to compare flag to to check if flag has been
released. */
kmp_info_t
*waiting_threads[1]; /**< Array of threads sleeping on this thread. */
kmp_uint32
num_waiting_threads; /**< Number of threads sleeping on this thread. */
public:
kmp_basic_flag_native(volatile FlagType *p)
: kmp_flag_native<FlagType>(p, traits_type::t), num_waiting_threads(0) {}
kmp_basic_flag_native(volatile FlagType *p, kmp_info_t *thr)
: kmp_flag_native<FlagType>(p, traits_type::t), num_waiting_threads(1) {
waiting_threads[0] = thr;
}
kmp_basic_flag_native(volatile FlagType *p, FlagType c)
: kmp_flag_native<FlagType>(p, traits_type::t), checker(c),
num_waiting_threads(0) {}
/*!
* param i in index into waiting_threads
* @result the thread that is waiting at index i
*/
kmp_info_t *get_waiter(kmp_uint32 i) {
KMP_DEBUG_ASSERT(i < num_waiting_threads);
return waiting_threads[i];
}
/*!
* @result num_waiting_threads
*/
kmp_uint32 get_num_waiters() { return num_waiting_threads; }
/*!
* @param thr in the thread which is now waiting
*
* Insert a waiting thread at index 0.
*/
void set_waiter(kmp_info_t *thr) {
waiting_threads[0] = thr;
num_waiting_threads = 1;
}
/*!
* @result true if the flag object has been released.
*/
bool done_check() {
if (Sleepable)
return (traits_type::tcr(*(this->get())) & ~KMP_BARRIER_SLEEP_STATE) ==
checker;
else
return traits_type::tcr(*(this->get())) == checker;
}
/*!
* @param old_loc in old value of flag
* @result true if the flag's old value indicates it was released.
*/
bool done_check_val(FlagType old_loc) { return old_loc == checker; }
/*!
* @result true if the flag object is not yet released.
* Used in __kmp_wait_template like:
* @code
* while (flag.notdone_check()) { pause(); }
* @endcode
*/
bool notdone_check() { return traits_type::tcr(*(this->get())) != checker; }
/*!
* @result Actual flag value before release was applied.
* Trigger all waiting threads to run by modifying flag to release state.
*/
void internal_release() {
(void)traits_type::test_then_add4((volatile FlagType *)this->get());
}
/*!
* @result Actual flag value before sleep bit(s) set.
* Notes that there is at least one thread sleeping on the flag by setting
* sleep bit(s).
*/
FlagType set_sleeping() {
return traits_type::test_then_or((volatile FlagType *)this->get(),
KMP_BARRIER_SLEEP_STATE);
}
/*!
* @result Actual flag value before sleep bit(s) cleared.
* Notes that there are no longer threads sleeping on the flag by clearing
* sleep bit(s).
*/
FlagType unset_sleeping() {
return traits_type::test_then_and((volatile FlagType *)this->get(),
~KMP_BARRIER_SLEEP_STATE);
}
/*!
* @param old_loc in old value of flag
* Test whether there are threads sleeping on the flag's old value in old_loc.
*/
bool is_sleeping_val(FlagType old_loc) {
return old_loc & KMP_BARRIER_SLEEP_STATE;
}
/*!
* Test whether there are threads sleeping on the flag.
*/
bool is_sleeping() { return is_sleeping_val(*(this->get())); }
bool is_any_sleeping() { return is_sleeping_val(*(this->get())); }
kmp_uint8 *get_stolen() { return NULL; }
enum barrier_type get_bt() { return bs_last_barrier; }
};
template <typename FlagType, bool Sleepable>
class kmp_basic_flag : public kmp_flag<FlagType> {
typedef flag_traits<FlagType> traits_type;
FlagType checker; /**< Value to compare flag to to check if flag has been
released. */
kmp_info_t
*waiting_threads[1]; /**< Array of threads sleeping on this thread. */
kmp_uint32
num_waiting_threads; /**< Number of threads sleeping on this thread. */
public:
kmp_basic_flag(std::atomic<FlagType> *p)
: kmp_flag<FlagType>(p, traits_type::t), num_waiting_threads(0) {}
kmp_basic_flag(std::atomic<FlagType> *p, kmp_info_t *thr)
: kmp_flag<FlagType>(p, traits_type::t), num_waiting_threads(1) {
waiting_threads[0] = thr;
}
kmp_basic_flag(std::atomic<FlagType> *p, FlagType c)
: kmp_flag<FlagType>(p, traits_type::t), checker(c),
num_waiting_threads(0) {}
/*!
* param i in index into waiting_threads
* @result the thread that is waiting at index i
*/
kmp_info_t *get_waiter(kmp_uint32 i) {
KMP_DEBUG_ASSERT(i < num_waiting_threads);
return waiting_threads[i];
}
/*!
* @result num_waiting_threads
*/
kmp_uint32 get_num_waiters() { return num_waiting_threads; }
/*!
* @param thr in the thread which is now waiting
*
* Insert a waiting thread at index 0.
*/
void set_waiter(kmp_info_t *thr) {
waiting_threads[0] = thr;
num_waiting_threads = 1;
}
/*!
* @result true if the flag object has been released.
*/
bool done_check() {
if (Sleepable)
return (this->load() & ~KMP_BARRIER_SLEEP_STATE) == checker;
else
return this->load() == checker;
}
/*!
* @param old_loc in old value of flag
* @result true if the flag's old value indicates it was released.
*/
bool done_check_val(FlagType old_loc) { return old_loc == checker; }
/*!
* @result true if the flag object is not yet released.
* Used in __kmp_wait_template like:
* @code
* while (flag.notdone_check()) { pause(); }
* @endcode
*/
bool notdone_check() { return this->load() != checker; }
/*!
* @result Actual flag value before release was applied.
* Trigger all waiting threads to run by modifying flag to release state.
*/
void internal_release() { KMP_ATOMIC_ADD(this->get(), 4); }
/*!
* @result Actual flag value before sleep bit(s) set.
* Notes that there is at least one thread sleeping on the flag by setting
* sleep bit(s).
*/
FlagType set_sleeping() {
return KMP_ATOMIC_OR(this->get(), KMP_BARRIER_SLEEP_STATE);
}
/*!
* @result Actual flag value before sleep bit(s) cleared.
* Notes that there are no longer threads sleeping on the flag by clearing
* sleep bit(s).
*/
FlagType unset_sleeping() {
return KMP_ATOMIC_AND(this->get(), ~KMP_BARRIER_SLEEP_STATE);
}
/*!
* @param old_loc in old value of flag
* Test whether there are threads sleeping on the flag's old value in old_loc.
*/
bool is_sleeping_val(FlagType old_loc) {
return old_loc & KMP_BARRIER_SLEEP_STATE;
}
/*!
* Test whether there are threads sleeping on the flag.
*/
bool is_sleeping() { return is_sleeping_val(this->load()); }
bool is_any_sleeping() { return is_sleeping_val(this->load()); }
kmp_uint8 *get_stolen() { return NULL; }
enum barrier_type get_bt() { return bs_last_barrier; }
};
template <bool Cancellable, bool Sleepable>
class kmp_flag_32 : public kmp_basic_flag<kmp_uint32, Sleepable> {
class kmp_flag_32 : public kmp_flag_atomic<kmp_uint32, flag32, Sleepable> {
public:
kmp_flag_32(std::atomic<kmp_uint32> *p)
: kmp_basic_flag<kmp_uint32, Sleepable>(p) {}
: kmp_flag_atomic<kmp_uint32, flag32, Sleepable>(p) {}
kmp_flag_32(std::atomic<kmp_uint32> *p, kmp_info_t *thr)
: kmp_basic_flag<kmp_uint32, Sleepable>(p, thr) {}
: kmp_flag_atomic<kmp_uint32, flag32, Sleepable>(p, thr) {}
kmp_flag_32(std::atomic<kmp_uint32> *p, kmp_uint32 c)
: kmp_basic_flag<kmp_uint32, Sleepable>(p, c) {}
: kmp_flag_atomic<kmp_uint32, flag32, Sleepable>(p, c) {}
void suspend(int th_gtid) { __kmp_suspend_32(th_gtid, this); }
#if KMP_HAVE_MWAIT || KMP_HAVE_UMWAIT
void mwait(int th_gtid) { __kmp_mwait_32(th_gtid, this); }
@ -895,14 +854,16 @@ public:
};
template <bool Cancellable, bool Sleepable>
class kmp_flag_64 : public kmp_basic_flag_native<kmp_uint64, Sleepable> {
class kmp_flag_64 : public kmp_flag_native<kmp_uint64, flag64, Sleepable> {
public:
kmp_flag_64(volatile kmp_uint64 *p)
: kmp_basic_flag_native<kmp_uint64, Sleepable>(p) {}
: kmp_flag_native<kmp_uint64, flag64, Sleepable>(p) {}
kmp_flag_64(volatile kmp_uint64 *p, kmp_info_t *thr)
: kmp_basic_flag_native<kmp_uint64, Sleepable>(p, thr) {}
: kmp_flag_native<kmp_uint64, flag64, Sleepable>(p, thr) {}
kmp_flag_64(volatile kmp_uint64 *p, kmp_uint64 c)
: kmp_basic_flag_native<kmp_uint64, Sleepable>(p, c) {}
: kmp_flag_native<kmp_uint64, flag64, Sleepable>(p, c) {}
kmp_flag_64(volatile kmp_uint64 *p, kmp_uint64 c, std::atomic<bool> *loc)
: kmp_flag_native<kmp_uint64, flag64, Sleepable>(p, c, loc) {}
void suspend(int th_gtid) { __kmp_suspend_64(th_gtid, this); }
#if KMP_HAVE_MWAIT || KMP_HAVE_UMWAIT
void mwait(int th_gtid) { __kmp_mwait_64(th_gtid, this); }
@ -928,20 +889,52 @@ public:
flag_type get_ptr_type() { return flag64; }
};
template <bool Cancellable, bool Sleepable>
class kmp_atomic_flag_64
: public kmp_flag_atomic<kmp_uint64, atomic_flag64, Sleepable> {
public:
kmp_atomic_flag_64(std::atomic<kmp_uint64> *p)
: kmp_flag_atomic<kmp_uint64, atomic_flag64, Sleepable>(p) {}
kmp_atomic_flag_64(std::atomic<kmp_uint64> *p, kmp_info_t *thr)
: kmp_flag_atomic<kmp_uint64, atomic_flag64, Sleepable>(p, thr) {}
kmp_atomic_flag_64(std::atomic<kmp_uint64> *p, kmp_uint64 c)
: kmp_flag_atomic<kmp_uint64, atomic_flag64, Sleepable>(p, c) {}
kmp_atomic_flag_64(std::atomic<kmp_uint64> *p, kmp_uint64 c,
std::atomic<bool> *loc)
: kmp_flag_atomic<kmp_uint64, atomic_flag64, Sleepable>(p, c, loc) {}
void suspend(int th_gtid) { __kmp_atomic_suspend_64(th_gtid, this); }
void mwait(int th_gtid) { __kmp_atomic_mwait_64(th_gtid, this); }
void resume(int th_gtid) { __kmp_atomic_resume_64(th_gtid, this); }
int execute_tasks(kmp_info_t *this_thr, kmp_int32 gtid, int final_spin,
int *thread_finished USE_ITT_BUILD_ARG(void *itt_sync_obj),
kmp_int32 is_constrained) {
return __kmp_atomic_execute_tasks_64(
this_thr, gtid, this, final_spin,
thread_finished USE_ITT_BUILD_ARG(itt_sync_obj), is_constrained);
}
bool wait(kmp_info_t *this_thr,
int final_spin USE_ITT_BUILD_ARG(void *itt_sync_obj)) {
if (final_spin)
return __kmp_wait_template<kmp_atomic_flag_64, TRUE, Cancellable,
Sleepable>(
this_thr, this USE_ITT_BUILD_ARG(itt_sync_obj));
else
return __kmp_wait_template<kmp_atomic_flag_64, FALSE, Cancellable,
Sleepable>(
this_thr, this USE_ITT_BUILD_ARG(itt_sync_obj));
}
void release() { __kmp_release_template(this); }
flag_type get_ptr_type() { return atomic_flag64; }
};
// Hierarchical 64-bit on-core barrier instantiation
class kmp_flag_oncore : public kmp_flag_native<kmp_uint64> {
kmp_uint64 checker;
kmp_info_t *waiting_threads[1];
kmp_uint32 num_waiting_threads;
kmp_uint32
offset; /**< Portion of flag that is of interest for an operation. */
class kmp_flag_oncore : public kmp_flag_native<kmp_uint64, flag_oncore, false> {
kmp_uint32 offset; /**< Portion of flag of interest for an operation. */
bool flag_switch; /**< Indicates a switch in flag location. */
enum barrier_type bt; /**< Barrier type. */
kmp_info_t *this_thr; /**< Thread that may be redirected to different flag
location. */
kmp_info_t *this_thr; /**< Thread to redirect to different flag location. */
#if USE_ITT_BUILD
void *
itt_sync_obj; /**< ITT object that must be passed to new flag location. */
void *itt_sync_obj; /**< ITT object to pass to new flag location. */
#endif
unsigned char &byteref(volatile kmp_uint64 *loc, size_t offset) {
return (RCAST(unsigned char *, CCAST(kmp_uint64 *, loc)))[offset];
@ -949,31 +942,26 @@ class kmp_flag_oncore : public kmp_flag_native<kmp_uint64> {
public:
kmp_flag_oncore(volatile kmp_uint64 *p)
: kmp_flag_native<kmp_uint64>(p, flag_oncore), num_waiting_threads(0),
flag_switch(false) {}
: kmp_flag_native<kmp_uint64, flag_oncore, false>(p), flag_switch(false) {
}
kmp_flag_oncore(volatile kmp_uint64 *p, kmp_uint32 idx)
: kmp_flag_native<kmp_uint64>(p, flag_oncore), num_waiting_threads(0),
offset(idx), flag_switch(false) {}
: kmp_flag_native<kmp_uint64, flag_oncore, false>(p), offset(idx),
flag_switch(false),
bt(bs_last_barrier) USE_ITT_BUILD_ARG(itt_sync_obj(nullptr)) {}
kmp_flag_oncore(volatile kmp_uint64 *p, kmp_uint64 c, kmp_uint32 idx,
enum barrier_type bar_t,
kmp_info_t *thr USE_ITT_BUILD_ARG(void *itt))
: kmp_flag_native<kmp_uint64>(p, flag_oncore), checker(c),
num_waiting_threads(0), offset(idx), flag_switch(false), bt(bar_t),
: kmp_flag_native<kmp_uint64, flag_oncore, false>(p, c), offset(idx),
flag_switch(false), bt(bar_t),
this_thr(thr) USE_ITT_BUILD_ARG(itt_sync_obj(itt)) {}
kmp_info_t *get_waiter(kmp_uint32 i) {
KMP_DEBUG_ASSERT(i < num_waiting_threads);
return waiting_threads[i];
}
kmp_uint32 get_num_waiters() { return num_waiting_threads; }
void set_waiter(kmp_info_t *thr) {
waiting_threads[0] = thr;
num_waiting_threads = 1;
}
bool done_check_val(kmp_uint64 old_loc) {
virtual ~kmp_flag_oncore() override {}
void *operator new(size_t size) { return __kmp_allocate(size); }
void operator delete(void *p) { __kmp_free(p); }
bool done_check_val(kmp_uint64 old_loc) override {
return byteref(&old_loc, offset) == checker;
}
bool done_check() { return done_check_val(*get()); }
bool notdone_check() {
bool done_check() override { return done_check_val(*get()); }
bool notdone_check() override {
// Calculate flag_switch
if (this_thr->th.th_bar[bt].bb.wait_flag == KMP_BARRIER_SWITCH_TO_OWN_FLAG)
flag_switch = true;
@ -997,17 +985,6 @@ public:
KMP_TEST_THEN_OR64(get(), mask);
}
}
kmp_uint64 set_sleeping() {
return KMP_TEST_THEN_OR64(get(), KMP_BARRIER_SLEEP_STATE);
}
kmp_uint64 unset_sleeping() {
return KMP_TEST_THEN_AND64(get(), ~KMP_BARRIER_SLEEP_STATE);
}
bool is_sleeping_val(kmp_uint64 old_loc) {
return old_loc & KMP_BARRIER_SLEEP_STATE;
}
bool is_sleeping() { return is_sleeping_val(*get()); }
bool is_any_sleeping() { return is_sleeping_val(*get()); }
void wait(kmp_info_t *this_thr, int final_spin) {
if (final_spin)
__kmp_wait_template<kmp_flag_oncore, TRUE>(
@ -1038,27 +1015,39 @@ public:
thread_finished USE_ITT_BUILD_ARG(itt_sync_obj), is_constrained);
#endif
}
kmp_uint8 *get_stolen() { return NULL; }
enum barrier_type get_bt() { return bt; }
flag_type get_ptr_type() { return flag_oncore; }
};
// Used to wake up threads, volatile void* flag is usually the th_sleep_loc
// associated with int gtid.
static inline void __kmp_null_resume_wrapper(int gtid, volatile void *flag) {
static inline void __kmp_null_resume_wrapper(kmp_info_t *thr) {
int gtid = __kmp_gtid_from_thread(thr);
void *flag = CCAST(void *, thr->th.th_sleep_loc);
flag_type type = thr->th.th_sleep_loc_type;
if (!flag)
return;
switch (RCAST(kmp_flag_64<> *, CCAST(void *, flag))->get_type()) {
// Attempt to wake up a thread: examine its type and call appropriate template
switch (type) {
case flag32:
__kmp_resume_32(gtid, (kmp_flag_32<> *)NULL);
__kmp_resume_32(gtid, RCAST(kmp_flag_32<> *, flag));
break;
case flag64:
__kmp_resume_64(gtid, (kmp_flag_64<> *)NULL);
__kmp_resume_64(gtid, RCAST(kmp_flag_64<> *, flag));
break;
case atomic_flag64:
__kmp_atomic_resume_64(gtid, RCAST(kmp_atomic_flag_64<> *, flag));
break;
case flag_oncore:
__kmp_resume_oncore(gtid, (kmp_flag_oncore *)NULL);
__kmp_resume_oncore(gtid, RCAST(kmp_flag_oncore *, flag));
break;
#ifdef KMP_DEBUG
case flag_unset:
KF_TRACE(100, ("__kmp_null_resume_wrapper: flag type %d is unset\n", type));
break;
default:
KF_TRACE(100, ("__kmp_null_resume_wrapper: flag type %d does not match any "
"known flag type\n",
type));
#endif
}
}

View File

@ -1407,9 +1407,13 @@ static inline void __kmp_suspend_template(int th_gtid, C *flag) {
/* TODO: shouldn't this use release semantics to ensure that
__kmp_suspend_initialize_thread gets called first? */
old_spin = flag->set_sleeping();
TCW_PTR(th->th.th_sleep_loc, (void *)flag);
th->th.th_sleep_loc_type = flag->get_type();
if (__kmp_dflt_blocktime == KMP_MAX_BLOCKTIME &&
__kmp_pause_status != kmp_soft_paused) {
flag->unset_sleeping();
TCW_PTR(th->th.th_sleep_loc, NULL);
th->th.th_sleep_loc_type = flag_unset;
__kmp_unlock_suspend_mx(th);
return;
}
@ -1417,8 +1421,10 @@ static inline void __kmp_suspend_template(int th_gtid, C *flag) {
" was %x\n",
th_gtid, flag->get(), flag->load(), old_spin));
if (flag->done_check_val(old_spin)) {
old_spin = flag->unset_sleeping();
if (flag->done_check_val(old_spin) || flag->done_check()) {
flag->unset_sleeping();
TCW_PTR(th->th.th_sleep_loc, NULL);
th->th.th_sleep_loc_type = flag_unset;
KF_TRACE(5, ("__kmp_suspend_template: T#%d false alarm, reset sleep bit "
"for spin(%p)\n",
th_gtid, flag->get()));
@ -1427,7 +1433,6 @@ static inline void __kmp_suspend_template(int th_gtid, C *flag) {
"with low probability" return when the condition variable has
not been signaled or broadcast */
int deactivated = FALSE;
TCW_PTR(th->th.th_sleep_loc, (void *)flag);
while (flag->is_sleeping()) {
#ifdef DEBUG_SUSPEND
@ -1449,6 +1454,9 @@ static inline void __kmp_suspend_template(int th_gtid, C *flag) {
deactivated = TRUE;
}
KMP_DEBUG_ASSERT(th->th.th_sleep_loc);
KMP_DEBUG_ASSERT(flag->get_type() == th->th.th_sleep_loc_type);
#if USE_SUSPEND_TIMEOUT
struct timespec now;
struct timeval tval;
@ -1478,6 +1486,18 @@ static inline void __kmp_suspend_template(int th_gtid, C *flag) {
if ((status != 0) && (status != EINTR) && (status != ETIMEDOUT)) {
KMP_SYSFAIL("pthread_cond_wait", status);
}
KMP_DEBUG_ASSERT(flag->get_type() == flag->get_ptr_type());
if (!flag->is_sleeping() &&
((status == EINTR) || (status == ETIMEDOUT))) {
// if interrupt or timeout, and thread is no longer sleeping, we need to
// make sure sleep_loc gets reset; however, this shouldn't be needed if
// we woke up with resume
flag->unset_sleeping();
TCW_PTR(th->th.th_sleep_loc, NULL);
th->th.th_sleep_loc_type = flag_unset;
}
#ifdef KMP_DEBUG
if (status == ETIMEDOUT) {
if (flag->is_sleeping()) {
@ -1487,6 +1507,8 @@ static inline void __kmp_suspend_template(int th_gtid, C *flag) {
KF_TRACE(2, ("__kmp_suspend_template: T#%d timeout wakeup, sleep bit "
"not set!\n",
th_gtid));
TCW_PTR(th->th.th_sleep_loc, NULL);
th->th.th_sleep_loc_type = flag_unset;
}
} else if (flag->is_sleeping()) {
KF_TRACE(100,
@ -1504,6 +1526,13 @@ static inline void __kmp_suspend_template(int th_gtid, C *flag) {
}
}
}
// We may have had the loop variable set before entering the loop body;
// so we need to reset sleep_loc.
TCW_PTR(th->th.th_sleep_loc, NULL);
th->th.th_sleep_loc_type = flag_unset;
KMP_DEBUG_ASSERT(!flag->is_sleeping());
KMP_DEBUG_ASSERT(!th->th.th_sleep_loc);
#ifdef DEBUG_SUSPEND
{
char buffer[128];
@ -1525,6 +1554,10 @@ template <bool C, bool S>
void __kmp_suspend_64(int th_gtid, kmp_flag_64<C, S> *flag) {
__kmp_suspend_template(th_gtid, flag);
}
template <bool C, bool S>
void __kmp_atomic_suspend_64(int th_gtid, kmp_atomic_flag_64<C, S> *flag) {
__kmp_suspend_template(th_gtid, flag);
}
void __kmp_suspend_oncore(int th_gtid, kmp_flag_oncore *flag) {
__kmp_suspend_template(th_gtid, flag);
}
@ -1532,6 +1565,10 @@ void __kmp_suspend_oncore(int th_gtid, kmp_flag_oncore *flag) {
template void __kmp_suspend_32<false, false>(int, kmp_flag_32<false, false> *);
template void __kmp_suspend_64<false, true>(int, kmp_flag_64<false, true> *);
template void __kmp_suspend_64<true, false>(int, kmp_flag_64<true, false> *);
template void
__kmp_atomic_suspend_64<false, true>(int, kmp_atomic_flag_64<false, true> *);
template void
__kmp_atomic_suspend_64<true, false>(int, kmp_atomic_flag_64<true, false> *);
/* This routine signals the thread specified by target_gtid to wake up
after setting the sleep bit indicated by the flag argument to FALSE.
@ -1554,36 +1591,50 @@ static inline void __kmp_resume_template(int target_gtid, C *flag) {
__kmp_lock_suspend_mx(th);
if (!flag) { // coming from __kmp_null_resume_wrapper
if (!flag || flag != th->th.th_sleep_loc) {
// coming from __kmp_null_resume_wrapper, or thread is now sleeping on a
// different location; wake up at new location
flag = (C *)CCAST(void *, th->th.th_sleep_loc);
}
// First, check if the flag is null or its type has changed. If so, someone
// else woke it up.
if (!flag || flag->get_type() != flag->get_ptr_type()) { // get_ptr_type
// simply shows what flag was cast to
if (!flag) { // Thread doesn't appear to be sleeping on anything
KF_TRACE(5, ("__kmp_resume_template: T#%d exiting, thread T#%d already "
"awake: flag(%p)\n",
gtid, target_gtid, NULL));
gtid, target_gtid, (void *)NULL));
__kmp_unlock_suspend_mx(th);
return;
} else if (flag->get_type() != th->th.th_sleep_loc_type) {
// Flag type does not appear to match this function template; possibly the
// thread is sleeping on something else. Try null resume again.
KF_TRACE(
5,
("__kmp_resume_template: T#%d retrying, thread T#%d Mismatch flag(%p), "
"spin(%p) type=%d ptr_type=%d\n",
gtid, target_gtid, flag, flag->get(), flag->get_type(),
th->th.th_sleep_loc_type));
__kmp_unlock_suspend_mx(th);
__kmp_null_resume_wrapper(th);
return;
} else { // if multiple threads are sleeping, flag should be internally
// referring to a specific thread here
typename C::flag_t old_spin = flag->unset_sleeping();
if (!flag->is_sleeping_val(old_spin)) {
if (!flag->is_sleeping()) {
KF_TRACE(5, ("__kmp_resume_template: T#%d exiting, thread T#%d already "
"awake: flag(%p): "
"%u => %u\n",
gtid, target_gtid, flag->get(), old_spin, flag->load()));
"awake: flag(%p): %u\n",
gtid, target_gtid, flag->get(), (unsigned int)flag->load()));
__kmp_unlock_suspend_mx(th);
return;
}
KF_TRACE(5, ("__kmp_resume_template: T#%d about to wakeup T#%d, reset "
"sleep bit for flag's loc(%p): "
"%u => %u\n",
gtid, target_gtid, flag->get(), old_spin, flag->load()));
}
KMP_DEBUG_ASSERT(flag);
flag->unset_sleeping();
TCW_PTR(th->th.th_sleep_loc, NULL);
th->th.th_sleep_loc_type = flag_unset;
KF_TRACE(5, ("__kmp_resume_template: T#%d about to wakeup T#%d, reset "
"sleep bit for flag's loc(%p): %u\n",
gtid, target_gtid, flag->get(), (unsigned int)flag->load()));
#ifdef DEBUG_SUSPEND
{
@ -1609,12 +1660,19 @@ template <bool C, bool S>
void __kmp_resume_64(int target_gtid, kmp_flag_64<C, S> *flag) {
__kmp_resume_template(target_gtid, flag);
}
template <bool C, bool S>
void __kmp_atomic_resume_64(int target_gtid, kmp_atomic_flag_64<C, S> *flag) {
__kmp_resume_template(target_gtid, flag);
}
void __kmp_resume_oncore(int target_gtid, kmp_flag_oncore *flag) {
__kmp_resume_template(target_gtid, flag);
}
template void __kmp_resume_32<false, true>(int, kmp_flag_32<false, true> *);
template void __kmp_resume_32<false, false>(int, kmp_flag_32<false, false> *);
template void __kmp_resume_64<false, true>(int, kmp_flag_64<false, true> *);
template void
__kmp_atomic_resume_64<false, true>(int, kmp_atomic_flag_64<false, true> *);
#if KMP_USE_MONITOR
void __kmp_resume_monitor() {

View File

@ -240,13 +240,12 @@ static void __kmp_win32_cond_wait(kmp_win32_cond_t *cv, kmp_win32_mutex_t *mx,
continue;
}
// condition fulfilled, exiting
old_f = flag->unset_sleeping();
KMP_DEBUG_ASSERT(old_f & KMP_BARRIER_SLEEP_STATE);
flag->unset_sleeping();
TCW_PTR(th->th.th_sleep_loc, NULL);
KF_TRACE(50,
("__kmp_win32_cond_wait: exiting, condition "
"fulfilled: flag's loc(%p): %u => %u\n",
flag->get(), (unsigned int)old_f, (unsigned int)flag->load()));
th->th.th_sleep_loc_type = flag_unset;
KF_TRACE(50, ("__kmp_win32_cond_wait: exiting, condition "
"fulfilled: flag's loc(%p): %u\n",
flag->get(), (unsigned int)flag->load()));
__kmp_win32_mutex_lock(&cv->waiters_count_lock_);
KMP_DEBUG_ASSERT(cv->waiters_count_ > 0);
@ -376,9 +375,13 @@ static inline void __kmp_suspend_template(int th_gtid, C *flag) {
/* TODO: shouldn't this use release semantics to ensure that
__kmp_suspend_initialize_thread gets called first? */
old_spin = flag->set_sleeping();
TCW_PTR(th->th.th_sleep_loc, (void *)flag);
th->th.th_sleep_loc_type = flag->get_type();
if (__kmp_dflt_blocktime == KMP_MAX_BLOCKTIME &&
__kmp_pause_status != kmp_soft_paused) {
flag->unset_sleeping();
TCW_PTR(th->th.th_sleep_loc, NULL);
th->th.th_sleep_loc_type = flag_unset;
__kmp_unlock_suspend_mx(th);
return;
}
@ -387,8 +390,10 @@ static inline void __kmp_suspend_template(int th_gtid, C *flag) {
" loc(%p)==%u\n",
th_gtid, flag->get(), (unsigned int)flag->load()));
if (flag->done_check_val(old_spin)) {
old_spin = flag->unset_sleeping();
if (flag->done_check_val(old_spin) || flag->done_check()) {
flag->unset_sleeping();
TCW_PTR(th->th.th_sleep_loc, NULL);
th->th.th_sleep_loc_type = flag_unset;
KF_TRACE(5, ("__kmp_suspend_template: T#%d false alarm, reset sleep bit "
"for flag's loc(%p)\n",
th_gtid, flag->get()));
@ -400,7 +405,7 @@ static inline void __kmp_suspend_template(int th_gtid, C *flag) {
low probability" return when the condition variable has not been signaled
or broadcast */
int deactivated = FALSE;
TCW_PTR(th->th.th_sleep_loc, (void *)flag);
while (flag->is_sleeping()) {
KF_TRACE(15, ("__kmp_suspend_template: T#%d about to perform "
"kmp_win32_cond_wait()\n",
@ -415,13 +420,14 @@ static inline void __kmp_suspend_template(int th_gtid, C *flag) {
KMP_DEBUG_ASSERT(TCR_4(__kmp_thread_pool_active_nth) >= 0);
}
deactivated = TRUE;
__kmp_win32_cond_wait(&th->th.th_suspend_cv, &th->th.th_suspend_mx, th,
flag);
} else {
__kmp_win32_cond_wait(&th->th.th_suspend_cv, &th->th.th_suspend_mx, th,
flag);
}
KMP_DEBUG_ASSERT(th->th.th_sleep_loc);
KMP_DEBUG_ASSERT(th->th.th_sleep_loc_type == flag->get_type());
__kmp_win32_cond_wait(&th->th.th_suspend_cv, &th->th.th_suspend_mx, th,
flag);
#ifdef KMP_DEBUG
if (flag->is_sleeping()) {
KF_TRACE(100,
@ -431,6 +437,14 @@ static inline void __kmp_suspend_template(int th_gtid, C *flag) {
} // while
// We may have had the loop variable set before entering the loop body;
// so we need to reset sleep_loc.
TCW_PTR(th->th.th_sleep_loc, NULL);
th->th.th_sleep_loc_type = flag_unset;
KMP_DEBUG_ASSERT(!flag->is_sleeping());
KMP_DEBUG_ASSERT(!th->th.th_sleep_loc);
// Mark the thread as active again (if it was previous marked as inactive)
if (deactivated) {
th->th.th_active = TRUE;
@ -453,6 +467,10 @@ template <bool C, bool S>
void __kmp_suspend_64(int th_gtid, kmp_flag_64<C, S> *flag) {
__kmp_suspend_template(th_gtid, flag);
}
template <bool C, bool S>
void __kmp_atomic_suspend_64(int th_gtid, kmp_atomic_flag_64<C, S> *flag) {
__kmp_suspend_template(th_gtid, flag);
}
void __kmp_suspend_oncore(int th_gtid, kmp_flag_oncore *flag) {
__kmp_suspend_template(th_gtid, flag);
}
@ -460,6 +478,10 @@ void __kmp_suspend_oncore(int th_gtid, kmp_flag_oncore *flag) {
template void __kmp_suspend_32<false, false>(int, kmp_flag_32<false, false> *);
template void __kmp_suspend_64<false, true>(int, kmp_flag_64<false, true> *);
template void __kmp_suspend_64<true, false>(int, kmp_flag_64<true, false> *);
template void
__kmp_atomic_suspend_64<false, true>(int, kmp_atomic_flag_64<false, true> *);
template void
__kmp_atomic_suspend_64<true, false>(int, kmp_atomic_flag_64<true, false> *);
/* This routine signals the thread specified by target_gtid to wake up
after setting the sleep bit indicated by the flag argument to FALSE */
@ -477,32 +499,35 @@ static inline void __kmp_resume_template(int target_gtid, C *flag) {
__kmp_suspend_initialize_thread(th);
__kmp_lock_suspend_mx(th);
if (!flag) { // coming from __kmp_null_resume_wrapper
if (!flag || flag != th->th.th_sleep_loc) {
// coming from __kmp_null_resume_wrapper, or thread is now sleeping on a
// different location; wake up at new location
flag = (C *)th->th.th_sleep_loc;
}
// First, check if the flag is null or its type has changed. If so, someone
// else woke it up.
if (!flag || flag->get_type() != flag->get_ptr_type()) { // get_ptr_type
// simply shows what
// flag was cast to
if (!flag || flag->get_type() != th->th.th_sleep_loc_type) {
// simply shows what flag was cast to
KF_TRACE(5, ("__kmp_resume_template: T#%d exiting, thread T#%d already "
"awake: flag's loc(%p)\n",
gtid, target_gtid, NULL));
__kmp_unlock_suspend_mx(th);
return;
} else {
typename C::flag_t old_spin = flag->unset_sleeping();
if (!flag->is_sleeping_val(old_spin)) {
if (!flag->is_sleeping()) {
KF_TRACE(5, ("__kmp_resume_template: T#%d exiting, thread T#%d already "
"awake: flag's loc(%p): %u => %u\n",
gtid, target_gtid, flag->get(), (unsigned int)old_spin,
(unsigned int)flag->load()));
"awake: flag's loc(%p): %u\n",
gtid, target_gtid, flag->get(), (unsigned int)flag->load()));
__kmp_unlock_suspend_mx(th);
return;
}
}
KMP_DEBUG_ASSERT(flag);
flag->unset_sleeping();
TCW_PTR(th->th.th_sleep_loc, NULL);
th->th.th_sleep_loc_type = flag_unset;
KF_TRACE(5, ("__kmp_resume_template: T#%d about to wakeup T#%d, reset sleep "
"bit for flag's loc(%p)\n",
gtid, target_gtid, flag->get()));
@ -523,12 +548,19 @@ template <bool C, bool S>
void __kmp_resume_64(int target_gtid, kmp_flag_64<C, S> *flag) {
__kmp_resume_template(target_gtid, flag);
}
template <bool C, bool S>
void __kmp_atomic_resume_64(int target_gtid, kmp_atomic_flag_64<C, S> *flag) {
__kmp_resume_template(target_gtid, flag);
}
void __kmp_resume_oncore(int target_gtid, kmp_flag_oncore *flag) {
__kmp_resume_template(target_gtid, flag);
}
template void __kmp_resume_32<false, true>(int, kmp_flag_32<false, true> *);
template void __kmp_resume_32<false, false>(int, kmp_flag_32<false, false> *);
template void __kmp_resume_64<false, true>(int, kmp_flag_64<false, true> *);
template void
__kmp_atomic_resume_64<false, true>(int, kmp_atomic_flag_64<false, true> *);
void __kmp_yield() { Sleep(0); }

View File

@ -2,6 +2,8 @@
// RUN: %libomp-compile && env KMP_BLOCKTIME=infinite %libomp-run
// RUN: %libomp-compile && env KMP_PLAIN_BARRIER_PATTERN='hierarchical,hierarchical' KMP_FORKJOIN_BARRIER_PATTERN='hierarchical,hierarchical' %libomp-run
// RUN: %libomp-compile && env KMP_BLOCKTIME=infinite KMP_PLAIN_BARRIER_PATTERN='hierarchical,hierarchical' KMP_FORKJOIN_BARRIER_PATTERN='hierarchical,hierarchical' %libomp-run
// RUN: %libomp-compile && env KMP_PLAIN_BARRIER_PATTERN='dist,dist' KMP_FORKJOIN_BARRIER_PATTERN='dist,dist' KMP_REDUCTION_BARRIER_PATTERN='dist,dist' %libomp-run
// RUN: %libomp-compile && env KMP_BLOCKTIME=infinite KMP_PLAIN_BARRIER_PATTERN='dist,dist' KMP_FORKJOIN_BARRIER_PATTERN='dist,dist' KMP_REDUCTION_BARRIER_PATTERN='dist,dist' %libomp-run
#include <stdio.h>
#include "omp_testsuite.h"
#include "omp_my_sleep.h"