Run-time library part of OpenMP 5.0 task reduction implementation.

Added test kmp_task_reduction_nest.cpp which has an example of
possible compiler codegen.

Differential Revision: https://reviews.llvm.org/D29600

llvm-svn: 295343
This commit is contained in:
Andrey Churbanov 2017-02-16 17:49:49 +00:00
parent 4569fee0c6
commit 72ba210916
4 changed files with 602 additions and 1 deletions

View File

@ -397,6 +397,15 @@ kmpc_set_defaults 224
kmpc_aligned_malloc 265
kmpc_set_disp_num_buffers 267
# OpenMP 5.0 entry points
# TODO: change to OMP_50 once it is implemented
%ifndef stub
%ifdef OMP_45
__kmpc_task_reduction_init 268
__kmpc_task_reduction_get_th_data 269
%endif
%endif
# User API entry points that have both lower- and upper- case versions for Fortran.
# Number for lowercase version is indicated. Number for uppercase is obtained by adding 1000.
# User API entry points are entry points that start with 'kmp_' or 'omp_'.

View File

@ -1979,9 +1979,14 @@ typedef struct kmp_taskgroup {
kmp_uint32 count; // number of allocated and not yet complete tasks
kmp_int32 cancel_request; // request for cancellation of this taskgroup
struct kmp_taskgroup *parent; // parent taskgroup
// TODO: change to OMP_50_ENABLED, need to change build tools for this to work
#if OMP_45_ENABLED
// Block of data to perform task reduction
void *reduce_data; // reduction related info
kmp_int32 reduce_num_data; // number of data items to reduce
#endif
} kmp_taskgroup_t;
// forward declarations
typedef union kmp_depnode kmp_depnode_t;
typedef struct kmp_depnode_list kmp_depnode_list_t;
@ -3421,6 +3426,11 @@ KMP_EXPORT void __kmpc_taskloop(ident_t *loc, kmp_int32 gtid, kmp_task_t *task,
kmp_uint64 *lb, kmp_uint64 *ub, kmp_int64 st,
kmp_int32 nogroup, kmp_int32 sched, kmp_uint64 grainsize, void * task_dup );
#endif
// TODO: change to OMP_50_ENABLED, need to change build tools for this to work
#if OMP_45_ENABLED
KMP_EXPORT void* __kmpc_task_reduction_init(int gtid, int num_data, void *data);
KMP_EXPORT void* __kmpc_task_reduction_get_th_data(int gtid, void *tg, void *d);
#endif
#endif

View File

@ -1596,6 +1596,204 @@ __kmpc_omp_taskyield( ident_t *loc_ref, kmp_int32 gtid, int end_part )
return TASK_CURRENT_NOT_QUEUED;
}
// TODO: change to OMP_50_ENABLED, need to change build tools for this to work
#if OMP_45_ENABLED
//
// Task Reduction implementation
//
typedef struct kmp_task_red_flags {
unsigned lazy_priv : 1; // hint: (1) use lazy allocation (big objects)
unsigned reserved31 : 31;
} kmp_task_red_flags_t;
// internal structure for reduction data item related info
typedef struct kmp_task_red_data {
void *reduce_shar; // shared reduction item
size_t reduce_size; // size of data item
void *reduce_priv; // thread specific data
void *reduce_pend; // end of private data for comparison op
void *reduce_init; // data initialization routine
void *reduce_fini; // data finalization routine
void *reduce_comb; // data combiner routine
kmp_task_red_flags_t flags; // flags for additional info from compiler
} kmp_task_red_data_t;
// structure sent us by compiler - one per reduction item
typedef struct kmp_task_red_input {
void *reduce_shar; // shared reduction item
size_t reduce_size; // size of data item
void *reduce_init; // data initialization routine
void *reduce_fini; // data finalization routine
void *reduce_comb; // data combiner routine
kmp_task_red_flags_t flags; // flags for additional info from compiler
} kmp_task_red_input_t;
/*!
@ingroup TASKING
@param gtid Global thread ID
@param num Number of data items to reduce
@param data Array of data for reduction
@return The taskgroup identifier
Initialize task reduction for the taskgroup.
*/
void*
__kmpc_task_reduction_init(int gtid, int num, void *data)
{
kmp_info_t * thread = __kmp_threads[gtid];
kmp_taskgroup_t * tg = thread->th.th_current_task->td_taskgroup;
kmp_int32 nth = thread->th.th_team_nproc;
kmp_task_red_input_t *input = (kmp_task_red_input_t*)data;
kmp_task_red_data_t *arr;
// check input data just in case
KMP_ASSERT(tg != NULL);
KMP_ASSERT(data != NULL);
KMP_ASSERT(num > 0);
if (nth == 1) {
KA_TRACE(10, ("__kmpc_task_reduction_init: T#%d, tg %p, exiting nth=1\n",
gtid, tg));
return (void*)tg;
}
KA_TRACE(10,("__kmpc_task_reduction_init: T#%d, taskgroup %p, #items %d\n",
gtid, tg, num));
arr = (kmp_task_red_data_t*)__kmp_thread_malloc(thread, num * sizeof(kmp_task_red_data_t));
for (int i = 0; i < num; ++i) {
void(*f_init)(void*) = (void(*)(void*))(input[i].reduce_init);
size_t size = input[i].reduce_size - 1;
// round the size up to cache line per thread-specific item
size += CACHE_LINE - size % CACHE_LINE;
KMP_ASSERT(input[i].reduce_comb != NULL); // combiner is mandatory
arr[i].reduce_shar = input[i].reduce_shar;
arr[i].reduce_size = size;
arr[i].reduce_init = input[i].reduce_init;
arr[i].reduce_fini = input[i].reduce_fini;
arr[i].reduce_comb = input[i].reduce_comb;
arr[i].flags = input[i].flags;
if (!input[i].flags.lazy_priv) {
// allocate cache-line aligned block and fill it with zeros
arr[i].reduce_priv = __kmp_allocate(nth * size);
arr[i].reduce_pend = (char*)(arr[i].reduce_priv) + nth * size;
if (f_init != NULL) {
// initialize thread-specific items
for (int j = 0; j < nth; ++j) {
f_init((char*)(arr[i].reduce_priv) + j * size);
}
}
} else {
// only allocate space for pointers now,
// objects will be lazily allocated/initialized once requested
arr[i].reduce_priv = __kmp_allocate(nth * sizeof(void*));
}
}
tg->reduce_data = (void*)arr;
tg->reduce_num_data = num;
return (void*)tg;
}
/*!
@ingroup TASKING
@param gtid Global thread ID
@param tskgrp The taskgroup ID (optional)
@param data Shared location of the item
@return The pointer to per-thread data
Get thread-specific location of data item
*/
void*
__kmpc_task_reduction_get_th_data(int gtid, void *tskgrp, void *data)
{
kmp_info_t * thread = __kmp_threads[gtid];
kmp_int32 nth = thread->th.th_team_nproc;
if (nth == 1)
return data; // nothing to do
kmp_taskgroup_t *tg = (kmp_taskgroup_t*)tskgrp;
if (tg == NULL)
tg = thread->th.th_current_task->td_taskgroup;
KMP_ASSERT(tg != NULL);
kmp_task_red_data_t *arr = (kmp_task_red_data_t*)(tg->reduce_data);
kmp_int32 num = tg->reduce_num_data;
kmp_int32 tid = thread->th.th_info.ds.ds_tid;
KMP_ASSERT(data != NULL);
while (tg != NULL) {
for (int i = 0; i < num; ++i) {
if (!arr[i].flags.lazy_priv) {
if (data == arr[i].reduce_shar ||
(data >= arr[i].reduce_priv && data < arr[i].reduce_pend))
return (char*)(arr[i].reduce_priv) + tid * arr[i].reduce_size;
} else {
// check shared location first
void **p_priv = (void**)(arr[i].reduce_priv);
if (data == arr[i].reduce_shar)
goto found;
// check if we get some thread specific location as parameter
for (int j = 0; j < nth; ++j)
if (data == p_priv[j])
goto found;
continue; // not found, continue search
found:
if (p_priv[tid] == NULL) {
// allocate thread specific object lazily
void(*f_init)(void*) = (void(*)(void*))(arr[i].reduce_init);
p_priv[tid] = __kmp_allocate(arr[i].reduce_size);
if (f_init != NULL) {
f_init(p_priv[tid]);
}
}
return p_priv[tid];
}
}
tg = tg->parent;
arr = (kmp_task_red_data_t*)(tg->reduce_data);
num = tg->reduce_num_data;
}
KMP_ASSERT2(0, "Unknown task reduction item");
return NULL; // ERROR, this line never executed
}
// Finalize task reduction.
// Called from __kmpc_end_taskgroup()
static void
__kmp_task_reduction_fini(kmp_info_t *th, kmp_taskgroup_t *tg)
{
kmp_int32 nth = th->th.th_team_nproc;
KMP_DEBUG_ASSERT(nth > 1); // should not be called if nth == 1
kmp_task_red_data_t *arr = (kmp_task_red_data_t*)tg->reduce_data;
kmp_int32 num = tg->reduce_num_data;
for (int i = 0; i < num; ++i) {
void *sh_data = arr[i].reduce_shar;
void(*f_fini)(void*) = (void(*)(void*))(arr[i].reduce_fini);
void(*f_comb)(void*,void*) = (void(*)(void*,void*))(arr[i].reduce_comb);
if (!arr[i].flags.lazy_priv) {
void *pr_data = arr[i].reduce_priv;
size_t size = arr[i].reduce_size;
for (int j = 0; j < nth; ++j) {
void * priv_data = (char*)pr_data + j * size;
f_comb(sh_data, priv_data); // combine results
if (f_fini)
f_fini(priv_data); // finalize if needed
}
} else {
void **pr_data = (void**)(arr[i].reduce_priv);
for (int j = 0; j < nth; ++j) {
if (pr_data[j] != NULL) {
f_comb(sh_data, pr_data[j]); // combine results
if (f_fini)
f_fini(pr_data[j]); // finalize if needed
__kmp_free(pr_data[j]);
}
}
}
__kmp_free(arr[i].reduce_priv);
}
__kmp_thread_free(th, arr);
tg->reduce_data = NULL;
tg->reduce_num_data = 0;
}
#endif
#if OMP_40_ENABLED
//-------------------------------------------------------------------------------------
@ -1612,6 +1810,11 @@ __kmpc_taskgroup( ident_t* loc, int gtid )
tg_new->count = 0;
tg_new->cancel_request = cancel_noreq;
tg_new->parent = taskdata->td_taskgroup;
// TODO: change to OMP_50_ENABLED, need to change build tools for this to work
#if OMP_45_ENABLED
tg_new->reduce_data = NULL;
tg_new->reduce_num_data = 0;
#endif
taskdata->td_taskgroup = tg_new;
}
@ -1660,6 +1863,11 @@ __kmpc_end_taskgroup( ident_t* loc, int gtid )
}
KMP_DEBUG_ASSERT( taskgroup->count == 0 );
// TODO: change to OMP_50_ENABLED, need to change build tools for this to work
#if OMP_45_ENABLED
if( taskgroup->reduce_data != NULL ) // need to reduce?
__kmp_task_reduction_fini(thread, taskgroup);
#endif
// Restore parent taskgroup for the current task
taskdata->td_taskgroup = taskgroup->parent;
__kmp_thread_free( thread, taskgroup );

View File

@ -0,0 +1,374 @@
// RUN: %libomp-compile-and-run
// RUN: %libomp-compile -DFLG=1 && %libomp-run
#include <cstdio>
#include <cmath>
#include <cassert>
#include <omp.h>
// Total number of loop iterations, should be multiple of T for this test
#define N 10000
// Flag to request lazy (1) or eager (0) allocation of reduction objects
#ifndef FLG
#define FLG 0
#endif
/*
// initial user's code that corresponds to pseudo code of the test
#pragma omp taskgroup task_reduction(+:i,j) task_reduction(*:x)
{
for( int l = 0; l < N; ++l ) {
#pragma omp task firstprivate(l) in_reduction(+:i) in_reduction(*:x)
{
i += l;
if( l%2 )
x *= 1.0 / (l + 1);
else
x *= (l + 1);
}
}
#pragma omp taskgroup task_reduction(-:i,k) task_reduction(+:y)
{
for( int l = 0; l < N; ++l ) {
#pragma omp task firstprivate(l) in_reduction(+:j,y) \
in_reduction(*:x) in_reduction(-:k)
{
j += l;
k -= l;
y += (double)l;
if( l%2 )
x *= 1.0 / (l + 1);
else
x *= (l + 1);
}
#pragma omp task firstprivate(l) in_reduction(+:y) in_reduction(-:i,k)
{
i -= l;
k -= l;
y += (double)l;
}
#pragma omp task firstprivate(l) in_reduction(+:j) in_reduction(*:x)
{
j += l;
if( l%2 )
x *= 1.0 / (l + 1);
else
x *= (l + 1);
}
}
} // inner reduction
for( int l = 0; l < N; ++l ) {
#pragma omp task firstprivate(l) in_reduction(+:j)
j += l;
}
} // outer reduction
*/
//------------------------------------------------
// OpenMP runtime library routines
#ifdef __cplusplus
extern "C" {
#endif
extern void* __kmpc_task_reduction_get_th_data(int gtid, void* tg, void* item);
extern void* __kmpc_task_reduction_init(int gtid, int num, void* data);
extern int __kmpc_global_thread_num(void*);
#ifdef __cplusplus
}
#endif
//------------------------------------------------
// Compiler-generated code
typedef struct _task_red_item {
void *shar; // shared reduction item
size_t size; // size of data item
void *f_init; // data initialization routine
void *f_fini; // data finalization routine
void *f_comb; // data combiner routine
unsigned flags;
} _task_red_item_t;
// int:+ no need in init/fini callbacks, valid for subtraction
void __red_int_add_comb(void *lhs, void *rhs) // combiner
{ *(int*)lhs += *(int*)rhs; }
// long long:+ no need in init/fini callbacks, valid for subtraction
void __red_llong_add_comb(void *lhs, void *rhs) // combiner
{ *(long long*)lhs += *(long long*)rhs; }
// double:* no need in fini callback
void __red_dbl_mul_init(void *data) // initializer
{ *(double*)data = 1.0; }
void __red_dbl_mul_comb(void *lhs, void *rhs) // combiner
{ *(double*)lhs *= *(double*)rhs; }
// double:+ no need in init/fini callbacks
void __red_dbl_add_comb(void *lhs, void *rhs) // combiner
{ *(double*)lhs += *(double*)rhs; }
// ==============================
void calc_serial(int *pi, long long *pj, double *px, long long *pk, double *py)
{
for( int l = 0; l < N; ++l ) {
*pi += l;
if( l%2 )
*px *= 1.0 / (l + 1);
else
*px *= (l + 1);
}
for( int l = 0; l < N; ++l ) {
*pj += l;
*pk -= l;
*py += (double)l;
if( l%2 )
*px *= 1.0 / (l + 1);
else
*px *= (l + 1);
*pi -= l;
*pk -= l;
*py += (double)l;
*pj += l;
if( l%2 )
*px *= 1.0 / (l + 1);
else
*px *= (l + 1);
}
for( int l = 0; l < N; ++l ) {
*pj += l;
}
}
//------------------------------------------------
// Test case
int main()
{
int nthreads = omp_get_max_threads();
int err = 0;
void** ptrs = (void**)malloc(nthreads*sizeof(void*));
// user's code ======================================
// variables for serial calculations:
int is = 3;
long long js = -9999999;
double xs = 99999.0;
long long ks = 99999999;
double ys = -99999999.0;
// variables for parallel calculations:
int ip = 3;
long long jp = -9999999;
double xp = 99999.0;
long long kp = 99999999;
double yp = -99999999.0;
calc_serial(&is, &js, &xs, &ks, &ys);
// ==================================================
for (int i = 0; i < nthreads; ++i)
ptrs[i] = NULL;
#pragma omp parallel
{
#pragma omp single nowait
{
// outer taskgroup reduces (i,j,x)
#pragma omp taskgroup // task_reduction(+:i,j) task_reduction(*:x)
{
_task_red_item_t red_data[3];
red_data[0].shar = &ip;
red_data[0].size = sizeof(ip);
red_data[0].f_init = NULL; // RTL will zero thread-specific objects
red_data[0].f_fini = NULL; // no destructors needed
red_data[0].f_comb = (void*)&__red_int_add_comb;
red_data[0].flags = FLG;
red_data[1].shar = &jp;
red_data[1].size = sizeof(jp);
red_data[1].f_init = NULL; // RTL will zero thread-specific objects
red_data[1].f_fini = NULL; // no destructors needed
red_data[1].f_comb = (void*)&__red_llong_add_comb;
red_data[1].flags = FLG;
red_data[2].shar = &xp;
red_data[2].size = sizeof(xp);
red_data[2].f_init = (void*)&__red_dbl_mul_init;
red_data[2].f_fini = NULL; // no destructors needed
red_data[2].f_comb = (void*)&__red_dbl_mul_comb;
red_data[2].flags = FLG;
int gtid = __kmpc_global_thread_num(NULL);
void* tg1 = __kmpc_task_reduction_init(gtid, 3, red_data);
for( int l = 0; l < N; l += 2 ) {
// 2 iterations per task to get correct x value; actually any even
// number of iters per task will work, otherwise x looses precision
#pragma omp task firstprivate(l) //in_reduction(+:i) in_reduction(*:x)
{
int gtid = __kmpc_global_thread_num(NULL);
int *p_ip = (int*)__kmpc_task_reduction_get_th_data(gtid, tg1, &ip);
double *p_xp = (double*)__kmpc_task_reduction_get_th_data(
gtid, tg1, &xp);
if (!ptrs[gtid]) ptrs[gtid] = p_xp;
// user's pseudo-code ==============================
*p_ip += l;
*p_xp *= (l + 1);
*p_ip += l + 1;
*p_xp *= 1.0 / (l + 2);
// ==================================================
}
}
// inner taskgroup reduces (i,k,y), i is same object as in outer one
#pragma omp taskgroup // task_reduction(-:i,k) task_reduction(+:y)
{
_task_red_item_t red_data[3];
red_data[0].shar = &ip;
red_data[0].size = sizeof(ip);
red_data[0].f_init = NULL; // RTL will zero thread-specific objects
red_data[0].f_fini = NULL; // no destructors needed
red_data[0].f_comb = (void*)&__red_int_add_comb;
red_data[0].flags = FLG;
red_data[1].shar = &kp;
red_data[1].size = sizeof(kp);
red_data[1].f_init = NULL; // RTL will zero thread-specific objects
red_data[1].f_fini = NULL; // no destructors needed
red_data[1].f_comb = (void*)&__red_llong_add_comb; // same for + and -
red_data[1].flags = FLG;
red_data[2].shar = &yp;
red_data[2].size = sizeof(yp);
red_data[2].f_init = NULL; // RTL will zero thread-specific objects
red_data[2].f_fini = NULL; // no destructors needed
red_data[2].f_comb = (void*)&__red_dbl_add_comb;
red_data[2].flags = FLG;
int gtid = __kmpc_global_thread_num(NULL);
void* tg2 = __kmpc_task_reduction_init(gtid, 3, red_data);
for( int l = 0; l < N; l += 2 ) {
#pragma omp task firstprivate(l)
// in_reduction(+:j,y) in_reduction(*:x) in_reduction(-:k)
{
int gtid = __kmpc_global_thread_num(NULL);
long long *p_jp = (long long*)__kmpc_task_reduction_get_th_data(
gtid, tg1, &jp);
long long *p_kp = (long long*)__kmpc_task_reduction_get_th_data(
gtid, tg2, &kp);
double *p_xp = (double*)__kmpc_task_reduction_get_th_data(
gtid, tg1, &xp);
double *p_yp = (double*)__kmpc_task_reduction_get_th_data(
gtid, tg2, &yp);
// user's pseudo-code ==============================
*p_jp += l;
*p_kp -= l;
*p_yp += (double)l;
*p_xp *= (l + 1);
*p_jp += l + 1;
*p_kp -= l + 1;
*p_yp += (double)(l + 1);
*p_xp *= 1.0 / (l + 2);
// =================================================
{
// the following code is here just to check __kmpc_task_reduction_get_th_data:
int tid = omp_get_thread_num();
void *addr1;
void *addr2;
addr1 = __kmpc_task_reduction_get_th_data(gtid, tg1, &xp); // from shared
addr2 = __kmpc_task_reduction_get_th_data(gtid, tg1, addr1); // from private
if (addr1 != addr2) {
#pragma omp atomic
++err;
printf("Wrong thread-specific addresses %d s:%p p:%p\n", tid, addr1, addr2);
}
// from neighbour w/o taskgroup (should start lookup from current tg2)
if (tid > 0) {
if (ptrs[tid-1]) {
addr2 = __kmpc_task_reduction_get_th_data(gtid, NULL, ptrs[tid-1]);
if (addr1 != addr2) {
#pragma omp atomic
++err;
printf("Wrong thread-specific addresses %d s:%p n:%p\n",
tid, addr1, addr2);
}
}
} else {
if (ptrs[nthreads-1]) {
addr2 = __kmpc_task_reduction_get_th_data(gtid, NULL, ptrs[nthreads-1]);
if (addr1 != addr2) {
#pragma omp atomic
++err;
printf("Wrong thread-specific addresses %d s:%p n:%p\n",
tid, addr1, addr2);
}
}
}
// ----------------------------------------------
}
}
#pragma omp task firstprivate(l)
// in_reduction(+:y) in_reduction(-:i,k)
{
int gtid = __kmpc_global_thread_num(NULL);
int *p_ip = (int*)__kmpc_task_reduction_get_th_data(
gtid, tg2, &ip);
long long *p_kp = (long long*)__kmpc_task_reduction_get_th_data(
gtid, tg2, &kp);
double *p_yp = (double*)__kmpc_task_reduction_get_th_data(
gtid, tg2, &yp);
// user's pseudo-code ==============================
*p_ip -= l;
*p_kp -= l;
*p_yp += (double)l;
*p_ip -= l + 1;
*p_kp -= l + 1;
*p_yp += (double)(l + 1);
// =================================================
}
#pragma omp task firstprivate(l)
// in_reduction(+:j) in_reduction(*:x)
{
int gtid = __kmpc_global_thread_num(NULL);
long long *p_jp = (long long*)__kmpc_task_reduction_get_th_data(
gtid, tg1, &jp);
double *p_xp = (double*)__kmpc_task_reduction_get_th_data(
gtid, tg1, &xp);
// user's pseudo-code ==============================
*p_jp += l;
*p_xp *= (l + 1);
*p_jp += l + 1;
*p_xp *= 1.0 / (l + 2);
// =================================================
}
}
} // inner reduction
for( int l = 0; l < N; l += 2 ) {
#pragma omp task firstprivate(l) // in_reduction(+:j)
{
int gtid = __kmpc_global_thread_num(NULL);
long long *p_jp = (long long*)__kmpc_task_reduction_get_th_data(
gtid, tg1, &jp);
// user's pseudo-code ==============================
*p_jp += l;
*p_jp += l + 1;
// =================================================
}
}
} // outer reduction
} // end single
} // end parallel
// check results
#if _DEBUG
printf("reduction flags = %u\n", FLG);
#endif
if (ip == is && jp == js && ks == kp &&
fabs(xp - xs) < 0.01 && fabs(yp - ys) < 0.01)
printf("passed\n");
else
printf("failed,\n ser:(%d %lld %f %lld %f)\n par:(%d %lld %f %lld %f)\n",
is, js, xs, ks, ys,
ip, jp, xp, kp, yp);
return 0;
}