diff --git a/openmp/runtime/src/dllexports b/openmp/runtime/src/dllexports index 955a9ef45b5d..81839c1c5454 100644 --- a/openmp/runtime/src/dllexports +++ b/openmp/runtime/src/dllexports @@ -397,6 +397,15 @@ kmpc_set_defaults 224 kmpc_aligned_malloc 265 kmpc_set_disp_num_buffers 267 +# OpenMP 5.0 entry points +# TODO: change to OMP_50 once it is implemented +%ifndef stub + %ifdef OMP_45 + __kmpc_task_reduction_init 268 + __kmpc_task_reduction_get_th_data 269 + %endif +%endif + # User API entry points that have both lower- and upper- case versions for Fortran. # Number for lowercase version is indicated. Number for uppercase is obtained by adding 1000. # User API entry points are entry points that start with 'kmp_' or 'omp_'. diff --git a/openmp/runtime/src/kmp.h b/openmp/runtime/src/kmp.h index da29338578ab..fb2da37ad62d 100644 --- a/openmp/runtime/src/kmp.h +++ b/openmp/runtime/src/kmp.h @@ -1979,9 +1979,14 @@ typedef struct kmp_taskgroup { kmp_uint32 count; // number of allocated and not yet complete tasks kmp_int32 cancel_request; // request for cancellation of this taskgroup struct kmp_taskgroup *parent; // parent taskgroup +// TODO: change to OMP_50_ENABLED, need to change build tools for this to work +#if OMP_45_ENABLED + // Block of data to perform task reduction + void *reduce_data; // reduction related info + kmp_int32 reduce_num_data; // number of data items to reduce +#endif } kmp_taskgroup_t; - // forward declarations typedef union kmp_depnode kmp_depnode_t; typedef struct kmp_depnode_list kmp_depnode_list_t; @@ -3421,6 +3426,11 @@ KMP_EXPORT void __kmpc_taskloop(ident_t *loc, kmp_int32 gtid, kmp_task_t *task, kmp_uint64 *lb, kmp_uint64 *ub, kmp_int64 st, kmp_int32 nogroup, kmp_int32 sched, kmp_uint64 grainsize, void * task_dup ); #endif +// TODO: change to OMP_50_ENABLED, need to change build tools for this to work +#if OMP_45_ENABLED +KMP_EXPORT void* __kmpc_task_reduction_init(int gtid, int num_data, void *data); +KMP_EXPORT void* __kmpc_task_reduction_get_th_data(int gtid, void *tg, void *d); +#endif #endif diff --git a/openmp/runtime/src/kmp_tasking.cpp b/openmp/runtime/src/kmp_tasking.cpp index 5965822b71f0..d3cf1cce03ba 100644 --- a/openmp/runtime/src/kmp_tasking.cpp +++ b/openmp/runtime/src/kmp_tasking.cpp @@ -1596,6 +1596,204 @@ __kmpc_omp_taskyield( ident_t *loc_ref, kmp_int32 gtid, int end_part ) return TASK_CURRENT_NOT_QUEUED; } +// TODO: change to OMP_50_ENABLED, need to change build tools for this to work +#if OMP_45_ENABLED +// +// Task Reduction implementation +// + +typedef struct kmp_task_red_flags { + unsigned lazy_priv : 1; // hint: (1) use lazy allocation (big objects) + unsigned reserved31 : 31; +} kmp_task_red_flags_t; + +// internal structure for reduction data item related info +typedef struct kmp_task_red_data { + void *reduce_shar; // shared reduction item + size_t reduce_size; // size of data item + void *reduce_priv; // thread specific data + void *reduce_pend; // end of private data for comparison op + void *reduce_init; // data initialization routine + void *reduce_fini; // data finalization routine + void *reduce_comb; // data combiner routine + kmp_task_red_flags_t flags; // flags for additional info from compiler +} kmp_task_red_data_t; + +// structure sent us by compiler - one per reduction item +typedef struct kmp_task_red_input { + void *reduce_shar; // shared reduction item + size_t reduce_size; // size of data item + void *reduce_init; // data initialization routine + void *reduce_fini; // data finalization routine + void *reduce_comb; // data combiner routine + kmp_task_red_flags_t flags; // flags for additional info from compiler +} kmp_task_red_input_t; + +/*! +@ingroup TASKING +@param gtid Global thread ID +@param num Number of data items to reduce +@param data Array of data for reduction +@return The taskgroup identifier + +Initialize task reduction for the taskgroup. +*/ +void* +__kmpc_task_reduction_init(int gtid, int num, void *data) +{ + kmp_info_t * thread = __kmp_threads[gtid]; + kmp_taskgroup_t * tg = thread->th.th_current_task->td_taskgroup; + kmp_int32 nth = thread->th.th_team_nproc; + kmp_task_red_input_t *input = (kmp_task_red_input_t*)data; + kmp_task_red_data_t *arr; + + // check input data just in case + KMP_ASSERT(tg != NULL); + KMP_ASSERT(data != NULL); + KMP_ASSERT(num > 0); + if (nth == 1) { + KA_TRACE(10, ("__kmpc_task_reduction_init: T#%d, tg %p, exiting nth=1\n", + gtid, tg)); + return (void*)tg; + } + KA_TRACE(10,("__kmpc_task_reduction_init: T#%d, taskgroup %p, #items %d\n", + gtid, tg, num)); + arr = (kmp_task_red_data_t*)__kmp_thread_malloc(thread, num * sizeof(kmp_task_red_data_t)); + for (int i = 0; i < num; ++i) { + void(*f_init)(void*) = (void(*)(void*))(input[i].reduce_init); + size_t size = input[i].reduce_size - 1; + // round the size up to cache line per thread-specific item + size += CACHE_LINE - size % CACHE_LINE; + KMP_ASSERT(input[i].reduce_comb != NULL); // combiner is mandatory + arr[i].reduce_shar = input[i].reduce_shar; + arr[i].reduce_size = size; + arr[i].reduce_init = input[i].reduce_init; + arr[i].reduce_fini = input[i].reduce_fini; + arr[i].reduce_comb = input[i].reduce_comb; + arr[i].flags = input[i].flags; + if (!input[i].flags.lazy_priv) { + // allocate cache-line aligned block and fill it with zeros + arr[i].reduce_priv = __kmp_allocate(nth * size); + arr[i].reduce_pend = (char*)(arr[i].reduce_priv) + nth * size; + if (f_init != NULL) { + // initialize thread-specific items + for (int j = 0; j < nth; ++j) { + f_init((char*)(arr[i].reduce_priv) + j * size); + } + } + } else { + // only allocate space for pointers now, + // objects will be lazily allocated/initialized once requested + arr[i].reduce_priv = __kmp_allocate(nth * sizeof(void*)); + } + } + tg->reduce_data = (void*)arr; + tg->reduce_num_data = num; + return (void*)tg; +} + +/*! +@ingroup TASKING +@param gtid Global thread ID +@param tskgrp The taskgroup ID (optional) +@param data Shared location of the item +@return The pointer to per-thread data + +Get thread-specific location of data item +*/ +void* +__kmpc_task_reduction_get_th_data(int gtid, void *tskgrp, void *data) +{ + kmp_info_t * thread = __kmp_threads[gtid]; + kmp_int32 nth = thread->th.th_team_nproc; + if (nth == 1) + return data; // nothing to do + + kmp_taskgroup_t *tg = (kmp_taskgroup_t*)tskgrp; + if (tg == NULL) + tg = thread->th.th_current_task->td_taskgroup; + KMP_ASSERT(tg != NULL); + kmp_task_red_data_t *arr = (kmp_task_red_data_t*)(tg->reduce_data); + kmp_int32 num = tg->reduce_num_data; + kmp_int32 tid = thread->th.th_info.ds.ds_tid; + + KMP_ASSERT(data != NULL); + while (tg != NULL) { + for (int i = 0; i < num; ++i) { + if (!arr[i].flags.lazy_priv) { + if (data == arr[i].reduce_shar || + (data >= arr[i].reduce_priv && data < arr[i].reduce_pend)) + return (char*)(arr[i].reduce_priv) + tid * arr[i].reduce_size; + } else { + // check shared location first + void **p_priv = (void**)(arr[i].reduce_priv); + if (data == arr[i].reduce_shar) + goto found; + // check if we get some thread specific location as parameter + for (int j = 0; j < nth; ++j) + if (data == p_priv[j]) + goto found; + continue; // not found, continue search + found: + if (p_priv[tid] == NULL) { + // allocate thread specific object lazily + void(*f_init)(void*) = (void(*)(void*))(arr[i].reduce_init); + p_priv[tid] = __kmp_allocate(arr[i].reduce_size); + if (f_init != NULL) { + f_init(p_priv[tid]); + } + } + return p_priv[tid]; + } + } + tg = tg->parent; + arr = (kmp_task_red_data_t*)(tg->reduce_data); + num = tg->reduce_num_data; + } + KMP_ASSERT2(0, "Unknown task reduction item"); + return NULL; // ERROR, this line never executed +} + +// Finalize task reduction. +// Called from __kmpc_end_taskgroup() +static void +__kmp_task_reduction_fini(kmp_info_t *th, kmp_taskgroup_t *tg) +{ + kmp_int32 nth = th->th.th_team_nproc; + KMP_DEBUG_ASSERT(nth > 1); // should not be called if nth == 1 + kmp_task_red_data_t *arr = (kmp_task_red_data_t*)tg->reduce_data; + kmp_int32 num = tg->reduce_num_data; + for (int i = 0; i < num; ++i) { + void *sh_data = arr[i].reduce_shar; + void(*f_fini)(void*) = (void(*)(void*))(arr[i].reduce_fini); + void(*f_comb)(void*,void*) = (void(*)(void*,void*))(arr[i].reduce_comb); + if (!arr[i].flags.lazy_priv) { + void *pr_data = arr[i].reduce_priv; + size_t size = arr[i].reduce_size; + for (int j = 0; j < nth; ++j) { + void * priv_data = (char*)pr_data + j * size; + f_comb(sh_data, priv_data); // combine results + if (f_fini) + f_fini(priv_data); // finalize if needed + } + } else { + void **pr_data = (void**)(arr[i].reduce_priv); + for (int j = 0; j < nth; ++j) { + if (pr_data[j] != NULL) { + f_comb(sh_data, pr_data[j]); // combine results + if (f_fini) + f_fini(pr_data[j]); // finalize if needed + __kmp_free(pr_data[j]); + } + } + } + __kmp_free(arr[i].reduce_priv); + } + __kmp_thread_free(th, arr); + tg->reduce_data = NULL; + tg->reduce_num_data = 0; +} +#endif #if OMP_40_ENABLED //------------------------------------------------------------------------------------- @@ -1612,6 +1810,11 @@ __kmpc_taskgroup( ident_t* loc, int gtid ) tg_new->count = 0; tg_new->cancel_request = cancel_noreq; tg_new->parent = taskdata->td_taskgroup; +// TODO: change to OMP_50_ENABLED, need to change build tools for this to work +#if OMP_45_ENABLED + tg_new->reduce_data = NULL; + tg_new->reduce_num_data = 0; +#endif taskdata->td_taskgroup = tg_new; } @@ -1660,6 +1863,11 @@ __kmpc_end_taskgroup( ident_t* loc, int gtid ) } KMP_DEBUG_ASSERT( taskgroup->count == 0 ); +// TODO: change to OMP_50_ENABLED, need to change build tools for this to work +#if OMP_45_ENABLED + if( taskgroup->reduce_data != NULL ) // need to reduce? + __kmp_task_reduction_fini(thread, taskgroup); +#endif // Restore parent taskgroup for the current task taskdata->td_taskgroup = taskgroup->parent; __kmp_thread_free( thread, taskgroup ); diff --git a/openmp/runtime/test/tasking/kmp_task_reduction_nest.cpp b/openmp/runtime/test/tasking/kmp_task_reduction_nest.cpp new file mode 100644 index 000000000000..d54de594c5bf --- /dev/null +++ b/openmp/runtime/test/tasking/kmp_task_reduction_nest.cpp @@ -0,0 +1,374 @@ +// RUN: %libomp-compile-and-run +// RUN: %libomp-compile -DFLG=1 && %libomp-run +#include +#include +#include +#include + +// Total number of loop iterations, should be multiple of T for this test +#define N 10000 + +// Flag to request lazy (1) or eager (0) allocation of reduction objects +#ifndef FLG +#define FLG 0 +#endif + +/* + // initial user's code that corresponds to pseudo code of the test + #pragma omp taskgroup task_reduction(+:i,j) task_reduction(*:x) + { + for( int l = 0; l < N; ++l ) { + #pragma omp task firstprivate(l) in_reduction(+:i) in_reduction(*:x) + { + i += l; + if( l%2 ) + x *= 1.0 / (l + 1); + else + x *= (l + 1); + } + } + + #pragma omp taskgroup task_reduction(-:i,k) task_reduction(+:y) + { + for( int l = 0; l < N; ++l ) { + #pragma omp task firstprivate(l) in_reduction(+:j,y) \ + in_reduction(*:x) in_reduction(-:k) + { + j += l; + k -= l; + y += (double)l; + if( l%2 ) + x *= 1.0 / (l + 1); + else + x *= (l + 1); + } + #pragma omp task firstprivate(l) in_reduction(+:y) in_reduction(-:i,k) + { + i -= l; + k -= l; + y += (double)l; + } + #pragma omp task firstprivate(l) in_reduction(+:j) in_reduction(*:x) + { + j += l; + if( l%2 ) + x *= 1.0 / (l + 1); + else + x *= (l + 1); + } + } + } // inner reduction + + for( int l = 0; l < N; ++l ) { + #pragma omp task firstprivate(l) in_reduction(+:j) + j += l; + } + } // outer reduction +*/ + +//------------------------------------------------ +// OpenMP runtime library routines +#ifdef __cplusplus +extern "C" { +#endif +extern void* __kmpc_task_reduction_get_th_data(int gtid, void* tg, void* item); +extern void* __kmpc_task_reduction_init(int gtid, int num, void* data); +extern int __kmpc_global_thread_num(void*); +#ifdef __cplusplus +} +#endif + +//------------------------------------------------ +// Compiler-generated code + +typedef struct _task_red_item { + void *shar; // shared reduction item + size_t size; // size of data item + void *f_init; // data initialization routine + void *f_fini; // data finalization routine + void *f_comb; // data combiner routine + unsigned flags; +} _task_red_item_t; + +// int:+ no need in init/fini callbacks, valid for subtraction +void __red_int_add_comb(void *lhs, void *rhs) // combiner +{ *(int*)lhs += *(int*)rhs; } + +// long long:+ no need in init/fini callbacks, valid for subtraction +void __red_llong_add_comb(void *lhs, void *rhs) // combiner +{ *(long long*)lhs += *(long long*)rhs; } + +// double:* no need in fini callback +void __red_dbl_mul_init(void *data) // initializer +{ *(double*)data = 1.0; } +void __red_dbl_mul_comb(void *lhs, void *rhs) // combiner +{ *(double*)lhs *= *(double*)rhs; } + +// double:+ no need in init/fini callbacks +void __red_dbl_add_comb(void *lhs, void *rhs) // combiner +{ *(double*)lhs += *(double*)rhs; } + +// ============================== + +void calc_serial(int *pi, long long *pj, double *px, long long *pk, double *py) +{ + for( int l = 0; l < N; ++l ) { + *pi += l; + if( l%2 ) + *px *= 1.0 / (l + 1); + else + *px *= (l + 1); + } + for( int l = 0; l < N; ++l ) { + *pj += l; + *pk -= l; + *py += (double)l; + if( l%2 ) + *px *= 1.0 / (l + 1); + else + *px *= (l + 1); + + *pi -= l; + *pk -= l; + *py += (double)l; + + *pj += l; + if( l%2 ) + *px *= 1.0 / (l + 1); + else + *px *= (l + 1); + } + for( int l = 0; l < N; ++l ) { + *pj += l; + } +} + +//------------------------------------------------ +// Test case +int main() +{ + int nthreads = omp_get_max_threads(); + int err = 0; + void** ptrs = (void**)malloc(nthreads*sizeof(void*)); + + // user's code ====================================== + // variables for serial calculations: + int is = 3; + long long js = -9999999; + double xs = 99999.0; + long long ks = 99999999; + double ys = -99999999.0; + // variables for parallel calculations: + int ip = 3; + long long jp = -9999999; + double xp = 99999.0; + long long kp = 99999999; + double yp = -99999999.0; + + calc_serial(&is, &js, &xs, &ks, &ys); + // ================================================== + for (int i = 0; i < nthreads; ++i) + ptrs[i] = NULL; + #pragma omp parallel + { + #pragma omp single nowait + { + // outer taskgroup reduces (i,j,x) + #pragma omp taskgroup // task_reduction(+:i,j) task_reduction(*:x) + { + _task_red_item_t red_data[3]; + red_data[0].shar = &ip; + red_data[0].size = sizeof(ip); + red_data[0].f_init = NULL; // RTL will zero thread-specific objects + red_data[0].f_fini = NULL; // no destructors needed + red_data[0].f_comb = (void*)&__red_int_add_comb; + red_data[0].flags = FLG; + red_data[1].shar = &jp; + red_data[1].size = sizeof(jp); + red_data[1].f_init = NULL; // RTL will zero thread-specific objects + red_data[1].f_fini = NULL; // no destructors needed + red_data[1].f_comb = (void*)&__red_llong_add_comb; + red_data[1].flags = FLG; + red_data[2].shar = &xp; + red_data[2].size = sizeof(xp); + red_data[2].f_init = (void*)&__red_dbl_mul_init; + red_data[2].f_fini = NULL; // no destructors needed + red_data[2].f_comb = (void*)&__red_dbl_mul_comb; + red_data[2].flags = FLG; + int gtid = __kmpc_global_thread_num(NULL); + void* tg1 = __kmpc_task_reduction_init(gtid, 3, red_data); + + for( int l = 0; l < N; l += 2 ) { + // 2 iterations per task to get correct x value; actually any even + // number of iters per task will work, otherwise x looses precision + #pragma omp task firstprivate(l) //in_reduction(+:i) in_reduction(*:x) + { + int gtid = __kmpc_global_thread_num(NULL); + int *p_ip = (int*)__kmpc_task_reduction_get_th_data(gtid, tg1, &ip); + double *p_xp = (double*)__kmpc_task_reduction_get_th_data( + gtid, tg1, &xp); + if (!ptrs[gtid]) ptrs[gtid] = p_xp; + + // user's pseudo-code ============================== + *p_ip += l; + *p_xp *= (l + 1); + + *p_ip += l + 1; + *p_xp *= 1.0 / (l + 2); + // ================================================== + } + } + // inner taskgroup reduces (i,k,y), i is same object as in outer one + #pragma omp taskgroup // task_reduction(-:i,k) task_reduction(+:y) + { + _task_red_item_t red_data[3]; + red_data[0].shar = &ip; + red_data[0].size = sizeof(ip); + red_data[0].f_init = NULL; // RTL will zero thread-specific objects + red_data[0].f_fini = NULL; // no destructors needed + red_data[0].f_comb = (void*)&__red_int_add_comb; + red_data[0].flags = FLG; + red_data[1].shar = &kp; + red_data[1].size = sizeof(kp); + red_data[1].f_init = NULL; // RTL will zero thread-specific objects + red_data[1].f_fini = NULL; // no destructors needed + red_data[1].f_comb = (void*)&__red_llong_add_comb; // same for + and - + red_data[1].flags = FLG; + red_data[2].shar = &yp; + red_data[2].size = sizeof(yp); + red_data[2].f_init = NULL; // RTL will zero thread-specific objects + red_data[2].f_fini = NULL; // no destructors needed + red_data[2].f_comb = (void*)&__red_dbl_add_comb; + red_data[2].flags = FLG; + int gtid = __kmpc_global_thread_num(NULL); + void* tg2 = __kmpc_task_reduction_init(gtid, 3, red_data); + + for( int l = 0; l < N; l += 2 ) { + #pragma omp task firstprivate(l) + // in_reduction(+:j,y) in_reduction(*:x) in_reduction(-:k) + { + int gtid = __kmpc_global_thread_num(NULL); + long long *p_jp = (long long*)__kmpc_task_reduction_get_th_data( + gtid, tg1, &jp); + long long *p_kp = (long long*)__kmpc_task_reduction_get_th_data( + gtid, tg2, &kp); + double *p_xp = (double*)__kmpc_task_reduction_get_th_data( + gtid, tg1, &xp); + double *p_yp = (double*)__kmpc_task_reduction_get_th_data( + gtid, tg2, &yp); + // user's pseudo-code ============================== + *p_jp += l; + *p_kp -= l; + *p_yp += (double)l; + *p_xp *= (l + 1); + + *p_jp += l + 1; + *p_kp -= l + 1; + *p_yp += (double)(l + 1); + *p_xp *= 1.0 / (l + 2); + // ================================================= +{ + // the following code is here just to check __kmpc_task_reduction_get_th_data: + int tid = omp_get_thread_num(); + void *addr1; + void *addr2; + addr1 = __kmpc_task_reduction_get_th_data(gtid, tg1, &xp); // from shared + addr2 = __kmpc_task_reduction_get_th_data(gtid, tg1, addr1); // from private + if (addr1 != addr2) { + #pragma omp atomic + ++err; + printf("Wrong thread-specific addresses %d s:%p p:%p\n", tid, addr1, addr2); + } + // from neighbour w/o taskgroup (should start lookup from current tg2) + if (tid > 0) { + if (ptrs[tid-1]) { + addr2 = __kmpc_task_reduction_get_th_data(gtid, NULL, ptrs[tid-1]); + if (addr1 != addr2) { + #pragma omp atomic + ++err; + printf("Wrong thread-specific addresses %d s:%p n:%p\n", + tid, addr1, addr2); + } + } + } else { + if (ptrs[nthreads-1]) { + addr2 = __kmpc_task_reduction_get_th_data(gtid, NULL, ptrs[nthreads-1]); + if (addr1 != addr2) { + #pragma omp atomic + ++err; + printf("Wrong thread-specific addresses %d s:%p n:%p\n", + tid, addr1, addr2); + } + } + } + // ---------------------------------------------- +} + } + #pragma omp task firstprivate(l) + // in_reduction(+:y) in_reduction(-:i,k) + { + int gtid = __kmpc_global_thread_num(NULL); + int *p_ip = (int*)__kmpc_task_reduction_get_th_data( + gtid, tg2, &ip); + long long *p_kp = (long long*)__kmpc_task_reduction_get_th_data( + gtid, tg2, &kp); + double *p_yp = (double*)__kmpc_task_reduction_get_th_data( + gtid, tg2, &yp); + + // user's pseudo-code ============================== + *p_ip -= l; + *p_kp -= l; + *p_yp += (double)l; + + *p_ip -= l + 1; + *p_kp -= l + 1; + *p_yp += (double)(l + 1); + // ================================================= + } + #pragma omp task firstprivate(l) + // in_reduction(+:j) in_reduction(*:x) + { + int gtid = __kmpc_global_thread_num(NULL); + long long *p_jp = (long long*)__kmpc_task_reduction_get_th_data( + gtid, tg1, &jp); + double *p_xp = (double*)__kmpc_task_reduction_get_th_data( + gtid, tg1, &xp); + // user's pseudo-code ============================== + *p_jp += l; + *p_xp *= (l + 1); + + *p_jp += l + 1; + *p_xp *= 1.0 / (l + 2); + // ================================================= + } + } + } // inner reduction + + for( int l = 0; l < N; l += 2 ) { + #pragma omp task firstprivate(l) // in_reduction(+:j) + { + int gtid = __kmpc_global_thread_num(NULL); + long long *p_jp = (long long*)__kmpc_task_reduction_get_th_data( + gtid, tg1, &jp); + // user's pseudo-code ============================== + *p_jp += l; + *p_jp += l + 1; + // ================================================= + } + } + } // outer reduction + } // end single + } // end parallel + // check results +#if _DEBUG + printf("reduction flags = %u\n", FLG); +#endif + if (ip == is && jp == js && ks == kp && + fabs(xp - xs) < 0.01 && fabs(yp - ys) < 0.01) + printf("passed\n"); + else + printf("failed,\n ser:(%d %lld %f %lld %f)\n par:(%d %lld %f %lld %f)\n", + is, js, xs, ks, ys, + ip, jp, xp, kp, yp); + return 0; +}