From 6043ac550021d6710b2721e05acf912d5c90dfe8 Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Mon, 7 Oct 2024 13:08:05 -0500 Subject: [PATCH 1/3] fcoll/vulcan: cleanup write_all operations This is a cleanup of the write_all operation, in preparation for adding read_all implementation to the vulcan component. Specifically: - remove the multiple group option: this was envisioned for the vulcan component, but never fully implemented. Therefore, having a few stubs at some locations (and an mca parameter) doesn't make sense, and are being removed to simplify the code. - remote the write_chunksize option: this was an artifact of the code based having evolved from dynamic_gen2 (where the option makes sense). However, in vulcan it doesn't really make sense and was actually not correctly implemented eitherway, so if somebody would have used that option, it would prbably have failed. The changed have been validated with the ompio testsuite as well as the hdf5 testphdf5, t_shapesame, and t_filters_parallel tests. Signed-off-by: Edgar Gabriel --- ompi/mca/fcoll/vulcan/fcoll_vulcan.h | 2 - .../mca/fcoll/vulcan/fcoll_vulcan_component.c | 16 -- .../vulcan/fcoll_vulcan_file_write_all.c | 226 ++++-------------- 3 files changed, 46 insertions(+), 198 deletions(-) diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan.h b/ompi/mca/fcoll/vulcan/fcoll_vulcan.h index a2fd6ca82bc..fc94b582cfa 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan.h +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan.h @@ -40,8 +40,6 @@ BEGIN_C_DECLS /* Globally exported variables */ extern int mca_fcoll_vulcan_priority; -extern int mca_fcoll_vulcan_num_groups; -extern int mca_fcoll_vulcan_write_chunksize; extern int mca_fcoll_vulcan_async_io; extern int mca_fcoll_vulcan_use_accelerator_buffers; diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan_component.c b/ompi/mca/fcoll/vulcan/fcoll_vulcan_component.c index 80a5bfb872a..18e59832ceb 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan_component.c +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan_component.c @@ -43,8 +43,6 @@ const char *mca_fcoll_vulcan_component_version_string = * Global variables */ int mca_fcoll_vulcan_priority = 10; -int mca_fcoll_vulcan_num_groups = 1; -int mca_fcoll_vulcan_write_chunksize = -1; int mca_fcoll_vulcan_async_io = 0; /* @@ -91,20 +89,6 @@ vulcan_register(void) OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_vulcan_priority); - mca_fcoll_vulcan_num_groups = 1; - (void) mca_base_component_var_register(&mca_fcoll_vulcan_component.fcollm_version, - "num_groups", "Number of subgroups created by the vulcan component", - MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, - OPAL_INFO_LVL_9, - MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_vulcan_num_groups); - - mca_fcoll_vulcan_write_chunksize = -1; - (void) mca_base_component_var_register(&mca_fcoll_vulcan_component.fcollm_version, - "write_chunksize", "Chunk size written at once. Default: stripe_size of the file system", - MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, - OPAL_INFO_LVL_9, - MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_vulcan_write_chunksize); - mca_fcoll_vulcan_async_io = 0; (void) mca_base_component_var_register(&mca_fcoll_vulcan_component.fcollm_version, "async_io", "Asynchronous I/O support options. 0: Automatic choice (default) " diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c index 5f89fba8d01..4b2b24ce03c 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c @@ -56,7 +56,7 @@ typedef struct mca_io_ompio_aggregator_data { int **blocklen_per_process; MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written; MPI_Comm comm; - char *buf, *global_buf, *prev_global_buf; + char *global_buf, *prev_global_buf; ompi_datatype_t **recvtype, **prev_recvtype; struct iovec *global_iov_array; int current_index, current_position; @@ -95,7 +95,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, ompi_request_t **reqs ); static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, - int write_chunksize, int write_synchType, ompi_request_t **request, + int write_syncType, ompi_request_t **request, bool is_accelerator_buffer); int mca_fcoll_vulcan_break_file_view ( struct iovec *decoded_iov, int iov_count, struct iovec *local_iov_array, int local_count, @@ -106,7 +106,7 @@ int mca_fcoll_vulcan_break_file_view ( struct iovec *decoded_iov, int iov_count, int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, - int num_groups, size_t max_data); + size_t max_data); static int local_heap_sort (mca_io_ompio_local_io_array *io_array, @@ -143,7 +143,6 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, ptrdiff_t *displs = NULL; int vulcan_num_io_procs; size_t max_data = 0; - MPI_Aint *total_bytes_per_process = NULL; struct iovec **broken_iov_arrays=NULL; struct iovec **broken_decoded_iovs=NULL; @@ -153,7 +152,7 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, int aggr_index = NOT_AGGR_INDEX; int write_synch_type = 2; - int write_chunksize, *result_counts=NULL; + int *result_counts=NULL; ompi_count_array_t fview_count_desc; ompi_disp_array_t displs_desc; @@ -192,7 +191,6 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, /* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what the user requested */ bytes_per_cycle =bytes_per_cycle/2; - write_chunksize = bytes_per_cycle; ret = mca_common_ompio_decode_datatype ((struct ompio_file_t *) fh, datatype, @@ -210,8 +208,7 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, status->_ucount = max_data; } - - ret = mca_fcoll_vulcan_get_configuration (fh, vulcan_num_io_procs, mca_fcoll_vulcan_num_groups, max_data); + ret = mca_fcoll_vulcan_get_configuration (fh, vulcan_num_io_procs, max_data); if (OMPI_SUCCESS != ret){ goto exit; } @@ -227,7 +224,6 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, aggr_data[i]->procs_per_group = fh->f_procs_per_group; aggr_data[i]->procs_in_group = fh->f_procs_in_group; aggr_data[i]->comm = fh->f_comm; - aggr_data[i]->buf = (char *)buf; // should not be used in the new version. // Identify if the process is an aggregator. // If so, aggr_index would be its index in "aggr_data" and "aggregators" arrays. if(fh->f_aggr_list[i] == fh->f_rank) { @@ -270,52 +266,15 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_comm_time = MPI_Wtime(); #endif - if ( 1 == mca_fcoll_vulcan_num_groups ) { - ret = fh->f_comm->c_coll->coll_allreduce (MPI_IN_PLACE, - broken_total_lengths, - fh->f_num_aggrs, - MPI_LONG, - MPI_SUM, - fh->f_comm, - fh->f_comm->c_coll->coll_allreduce_module); - if( OMPI_SUCCESS != ret){ - goto exit; - } - - } - else { - total_bytes_per_process = (MPI_Aint*)malloc - (fh->f_num_aggrs * fh->f_procs_per_group*sizeof(MPI_Aint)); - if (NULL == total_bytes_per_process) { - opal_output (1, "OUT OF MEMORY\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } - - ret = ompi_fcoll_base_coll_allgather_array (broken_total_lengths, - fh->f_num_aggrs, - MPI_LONG, - total_bytes_per_process, - fh->f_num_aggrs, - MPI_LONG, - 0, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); - if( OMPI_SUCCESS != ret){ - goto exit; - } - - for ( i=0; if_num_aggrs; i++ ) { - broken_total_lengths[i] = 0; - for (j=0 ; jf_procs_per_group ; j++) { - broken_total_lengths[i] += total_bytes_per_process[j*fh->f_num_aggrs + i]; - } - } - if (NULL != total_bytes_per_process) { - free (total_bytes_per_process); - total_bytes_per_process = NULL; - } + ret = fh->f_comm->c_coll->coll_allreduce (MPI_IN_PLACE, + broken_total_lengths, + fh->f_num_aggrs, + MPI_LONG, + MPI_SUM, + fh->f_comm, + fh->f_comm->c_coll->coll_allreduce_module); + if( OMPI_SUCCESS != ret){ + goto exit; } #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN @@ -342,8 +301,7 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_comm_time = MPI_Wtime(); #endif - if ( 1 == mca_fcoll_vulcan_num_groups ) { - ret = fh->f_comm->c_coll->coll_allgather(broken_counts, + ret = fh->f_comm->c_coll->coll_allgather(broken_counts, fh->f_num_aggrs, MPI_INT, result_counts, @@ -351,19 +309,6 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, MPI_INT, fh->f_comm, fh->f_comm->c_coll->coll_allgather_module); - } - else { - ret = ompi_fcoll_base_coll_allgather_array (broken_counts, - fh->f_num_aggrs, - MPI_INT, - result_counts, - fh->f_num_aggrs, - MPI_INT, - 0, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); - } if( OMPI_SUCCESS != ret){ goto exit; } @@ -428,32 +373,17 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_comm_time = MPI_Wtime(); #endif - if ( 1 == mca_fcoll_vulcan_num_groups ) { - OMPI_COUNT_ARRAY_INIT(&fview_count_desc, aggr_data[i]->fview_count); - OMPI_DISP_ARRAY_INIT(&displs_desc, displs); - ret = fh->f_comm->c_coll->coll_allgatherv (broken_iov_arrays[i], - broken_counts[i], - fh->f_iov_type, - aggr_data[i]->global_iov_array, - fview_count_desc, - displs_desc, - fh->f_iov_type, - fh->f_comm, - fh->f_comm->c_coll->coll_allgatherv_module ); - } - else { - ret = ompi_fcoll_base_coll_allgatherv_array (broken_iov_arrays[i], - broken_counts[i], - fh->f_iov_type, - aggr_data[i]->global_iov_array, - aggr_data[i]->fview_count, - displs, - fh->f_iov_type, - fh->f_aggr_list[i], - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); - } + OMPI_COUNT_ARRAY_INIT(&fview_count_desc, aggr_data[i]->fview_count); + OMPI_DISP_ARRAY_INIT(&displs_desc, displs); + ret = fh->f_comm->c_coll->coll_allgatherv (broken_iov_arrays[i], + broken_counts[i], + fh->f_iov_type, + aggr_data[i]->global_iov_array, + fview_count_desc, + displs_desc, + fh->f_iov_type, + fh->f_comm, + fh->f_comm->c_coll->coll_allgatherv_module ); if (OMPI_SUCCESS != ret){ goto exit; } @@ -632,7 +562,7 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, start_write_time = MPI_Wtime(); #endif ret = write_init (fh, fh->f_aggr_list[aggr_index], aggr_data[aggr_index], - write_chunksize, write_synch_type, &req_iwrite, use_accelerator_buffer); + write_synch_type, &req_iwrite, use_accelerator_buffer); if (OMPI_SUCCESS != ret){ goto exit; } @@ -672,7 +602,7 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, start_write_time = MPI_Wtime(); #endif ret = write_init (fh, fh->f_aggr_list[aggr_index], aggr_data[aggr_index], - write_chunksize, write_synch_type, &req_iwrite, use_accelerator_buffer); + write_synch_type, &req_iwrite, use_accelerator_buffer); if (OMPI_SUCCESS != ret){ goto exit; } @@ -781,30 +711,33 @@ exit : static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, - int write_chunksize, - int write_synchType, + int write_syncType, ompi_request_t **request, bool is_accelerator_buffer) { int ret = OMPI_SUCCESS; ssize_t ret_temp = 0; - int last_array_pos = 0; - int last_pos = 0; + int i; mca_ompio_request_t *ompio_req = NULL; mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_WRITE ); if (aggr_data->prev_num_io_entries) { - /* In this case, aggr_data->prev_num_io_entries is always == 1. - Therefore we can write the data of size aggr_data->prev_bytes_to_write in one iteration. - In fact, aggr_data->prev_bytes_to_write <= write_chunksize. - */ - mca_fcoll_vulcan_split_iov_array (fh, aggr_data->prev_io_array, - aggr_data->prev_num_io_entries, - &last_array_pos, &last_pos, - write_chunksize); + fh->f_num_of_io_entries = aggr_data->prev_num_io_entries; + fh->f_io_array = (mca_common_ompio_io_array_t *) malloc (fh->f_num_of_io_entries * + sizeof(mca_common_ompio_io_array_t)); + if ( NULL == fh->f_io_array ){ + opal_output (1,"Could not allocate memory\n"); + return -1; + } + + for (i = 0; i < fh->f_num_of_io_entries; i++) { + fh->f_io_array[i].memory_address = aggr_data->prev_io_array[i].memory_address; + fh->f_io_array[i].offset = aggr_data->prev_io_array[i].offset; + fh->f_io_array[i].length = aggr_data->prev_io_array[i].length; + } - if (1 == write_synchType) { + if (1 == write_syncType) { if (is_accelerator_buffer) { ret = mca_common_ompio_file_iwrite_pregen(fh, (ompi_request_t *) ompio_req); if(0 > ret) { @@ -1435,16 +1368,12 @@ static int mca_fcoll_vulcan_minmax ( ompio_file_t *fh, struct iovec *iov, int io fh->f_comm->c_coll->coll_allreduce ( &max, &globalmax, 1, MPI_LONG, MPI_MAX, fh->f_comm, fh->f_comm->c_coll->coll_allreduce_module); - // if ( fh->f_rank < 10 ) printf("[%d]: min=%ld max=%ld globalmin=%ld, globalmax=%ld num_aggregators=%d\n", fh->f_rank, min, max, globalmin, globalmax, num_aggregators); - stripe_size = (globalmax - globalmin)/num_aggregators; if ( (globalmax - globalmin) % num_aggregators ) { stripe_size++; } *new_stripe_size = stripe_size; - // if ( fh->f_rank == 0 ) - // printf(" partition size is %ld\n", stripe_size); return OMPI_SUCCESS; } @@ -1678,21 +1607,11 @@ int mca_fcoll_vulcan_break_file_view ( struct iovec *mem_iov, int mem_count, return ret; } - -int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, int num_groups, - size_t max_data) +int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, size_t max_data) { int i, ret; - ret = mca_common_ompio_set_aggregator_props (fh, num_io_procs, max_data); - /* Note: as of this version of the vulcan component, we are not using yet - the num_groups parameter to split the aggregators (and processes) into - distinct subgroups. This will however hopefullty be done in a second step - as well, allowing to keep communication just to individual subgroups of processes, - each subgroup using however the classic two-phase collective I/O algorithm - with multiple aggregators and even partitioning internally. - - For now, logically all processes are in a single group. */ + ret = mca_common_ompio_set_aggregator_props (fh, num_io_procs, max_data); fh->f_procs_per_group = fh->f_size; if ( NULL != fh->f_procs_in_group ) { @@ -1709,59 +1628,6 @@ int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, int return ret; } - -int mca_fcoll_vulcan_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *io_array, int num_entries, - int *ret_array_pos, int *ret_pos, int chunk_size ) -{ - - int array_pos = *ret_array_pos; - int pos = *ret_pos; - size_t bytes_written = 0; - size_t bytes_to_write = chunk_size; - - if ( 0 == array_pos && 0 == pos ) { - fh->f_io_array = (mca_common_ompio_io_array_t *) malloc ( num_entries * sizeof(mca_common_ompio_io_array_t)); - if ( NULL == fh->f_io_array ){ - opal_output (1,"Could not allocate memory\n"); - return -1; - } - } - - int i=0; - while (bytes_to_write > 0 ) { - fh->f_io_array[i].memory_address = &(((char *)io_array[array_pos].memory_address)[pos]); - fh->f_io_array[i].offset = &(((char *)io_array[array_pos].offset)[pos]); - - if ( (io_array[array_pos].length - pos ) >= bytes_to_write ) { - fh->f_io_array[i].length = bytes_to_write; - } - else { - fh->f_io_array[i].length = io_array[array_pos].length - pos; - } - - pos += fh->f_io_array[i].length; - bytes_written += fh->f_io_array[i].length; - bytes_to_write-= fh->f_io_array[i].length; - i++; - - if ( pos == (int)io_array[array_pos].length ) { - pos = 0; - if ((array_pos + 1) < num_entries) { - array_pos++; - } - else { - break; - } - } - } - - fh->f_num_of_io_entries = i; - *ret_array_pos = array_pos; - *ret_pos = pos; - return bytes_written; -} - - static int local_heap_sort (mca_io_ompio_local_io_array *io_array, int num_entries, int *sorted) From 4b356be052b40b75ea09ede2708a4b4d32049395 Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Tue, 8 Oct 2024 10:21:54 -0500 Subject: [PATCH 2/3] fcoll/vulcan: minor code reorg extract some code into stand alone routines in preparation for adding a vulcan read_all implementation. Specifically, this pr adds new routines for : - mca_fcoll_vulcan_calc_blocklen_disps - mca_fcoll_vulcan_calc_file_offsets - mca_fcoll_vulcan_calc_io_array which will be reused in the read_all as well. Also, some white-space cleanup of the code. Signed-off-by: Edgar Gabriel --- .../vulcan/fcoll_vulcan_file_write_all.c | 847 ++++++++---------- 1 file changed, 392 insertions(+), 455 deletions(-) diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c index 4b2b24ce03c..3748948bea5 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c @@ -1,3 +1,4 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology @@ -91,36 +92,39 @@ typedef struct mca_io_ompio_aggregator_data { _aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t; } \ } -static int shuffle_init ( int index, int cycles, int aggregator, int rank, - mca_io_ompio_aggregator_data *data, - ompi_request_t **reqs ); +static int shuffle_init (int index, int num_cycles, int aggregator, int rank, + mca_io_ompio_aggregator_data *data, ompi_request_t **reqs); static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_syncType, ompi_request_t **request, bool is_accelerator_buffer); -int mca_fcoll_vulcan_break_file_view ( struct iovec *decoded_iov, int iov_count, - struct iovec *local_iov_array, int local_count, - struct iovec ***broken_decoded_iovs, int **broken_iov_counts, - struct iovec ***broken_iov_arrays, int **broken_counts, - MPI_Aint **broken_total_lengths, - int stripe_count, size_t stripe_size); +int mca_fcoll_vulcan_break_file_view (struct iovec *decoded_iov, int iov_count, + struct iovec *local_iov_array, int local_count, + struct iovec ***broken_decoded_iovs, int **broken_iov_counts, + struct iovec ***broken_iov_arrays, int **broken_counts, + MPI_Aint **broken_total_lengths, + int stripe_count, size_t stripe_size); -int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, +int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, size_t max_data); - static int local_heap_sort (mca_io_ompio_local_io_array *io_array, - int num_entries, - int *sorted); + int num_entries, int *sorted); -int mca_fcoll_vulcan_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *work_array, - int num_entries, int *last_array_pos, int *last_pos_in_field, - int chunk_size ); +int mca_fcoll_vulcan_minmax (ompio_file_t *fh, struct iovec *iov, int iov_count, int num_aggregators, + long *new_stripe_size); +void mca_fcoll_vulcan_calc_blocklen_disps (mca_io_ompio_aggregator_data *data, int aggregator, + int rank, size_t *bytes_comm); -static int mca_fcoll_vulcan_minmax ( ompio_file_t *fh, struct iovec *iov, int iov_count, int num_aggregators, - long *new_stripe_size); +int mca_fcoll_vulcan_calc_file_offsets(mca_io_ompio_aggregator_data *data, + mca_io_ompio_local_io_array *file_offsets_for_agg, + int *sorted_file_offsets, MPI_Aint *memory_displacements, + int entries_per_aggregator, int rank, int index); +void mca_fcoll_vulcan_calc_io_array(mca_common_ompio_io_array_t *io_array, int *num_io_entries, int max_io_arrays, + char *global_buf, mca_io_ompio_local_io_array *file_offsets_for_agg, + int *sorted_offsets, MPI_Aint *memory_displacements, int rank); int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, const void *buf, @@ -185,8 +189,8 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, mca_common_ompio_check_gpu_buf (fh, buf, &is_gpu, &is_managed); if (is_gpu && !is_managed && - fh->f_get_mca_parameter_value ("use_accelerator_buffers", strlen("use_accelerator_buffers"))) { - use_accelerator_buffer = true; + fh->f_get_mca_parameter_value ("use_accelerator_buffers", strlen("use_accelerator_buffers"))) { + use_accelerator_buffer = true; } /* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what the user requested */ @@ -205,12 +209,12 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, } if ( MPI_STATUS_IGNORE != status ) { - status->_ucount = max_data; + status->_ucount = max_data; } ret = mca_fcoll_vulcan_get_configuration (fh, vulcan_num_io_procs, max_data); if (OMPI_SUCCESS != ret){ - goto exit; + goto exit; } aggr_data = (mca_io_ompio_aggregator_data **) malloc ( fh->f_num_aggrs * @@ -236,11 +240,11 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, *** this write operation ********************************************************************/ ret = fh->f_generate_current_file_view( (struct ompio_file_t *) fh, - max_data, - &local_iov_array, - &local_count); + max_data, + &local_iov_array, + &local_count); if (ret != OMPI_SUCCESS){ - goto exit; + goto exit; } /************************************************************************* @@ -267,12 +271,12 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, start_comm_time = MPI_Wtime(); #endif ret = fh->f_comm->c_coll->coll_allreduce (MPI_IN_PLACE, - broken_total_lengths, - fh->f_num_aggrs, - MPI_LONG, - MPI_SUM, - fh->f_comm, - fh->f_comm->c_coll->coll_allreduce_module); + broken_total_lengths, + fh->f_num_aggrs, + MPI_LONG, + MPI_SUM, + fh->f_comm, + fh->f_comm->c_coll->coll_allreduce_module); if( OMPI_SUCCESS != ret){ goto exit; } @@ -302,13 +306,13 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, start_comm_time = MPI_Wtime(); #endif ret = fh->f_comm->c_coll->coll_allgather(broken_counts, - fh->f_num_aggrs, - MPI_INT, - result_counts, - fh->f_num_aggrs, - MPI_INT, - fh->f_comm, - fh->f_comm->c_coll->coll_allgather_module); + fh->f_num_aggrs, + MPI_INT, + result_counts, + fh->f_num_aggrs, + MPI_INT, + fh->f_comm, + fh->f_comm->c_coll->coll_allgather_module); if( OMPI_SUCCESS != ret){ goto exit; } @@ -373,17 +377,17 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_comm_time = MPI_Wtime(); #endif - OMPI_COUNT_ARRAY_INIT(&fview_count_desc, aggr_data[i]->fview_count); - OMPI_DISP_ARRAY_INIT(&displs_desc, displs); - ret = fh->f_comm->c_coll->coll_allgatherv (broken_iov_arrays[i], - broken_counts[i], - fh->f_iov_type, - aggr_data[i]->global_iov_array, - fview_count_desc, - displs_desc, - fh->f_iov_type, - fh->f_comm, - fh->f_comm->c_coll->coll_allgatherv_module ); + OMPI_COUNT_ARRAY_INIT(&fview_count_desc, aggr_data[i]->fview_count); + OMPI_DISP_ARRAY_INIT(&displs_desc, displs); + ret = fh->f_comm->c_coll->coll_allgatherv (broken_iov_arrays[i], + broken_counts[i], + fh->f_iov_type, + aggr_data[i]->global_iov_array, + fview_count_desc, + displs_desc, + fh->f_iov_type, + fh->f_comm, + fh->f_comm->c_coll->coll_allgatherv_module ); if (OMPI_SUCCESS != ret){ goto exit; } @@ -469,8 +473,8 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, } if (use_accelerator_buffer) { - opal_output_verbose(10, ompi_fcoll_base_framework.framework_output, - "Allocating GPU device buffer for aggregation\n"); + opal_output_verbose(10, ompi_fcoll_base_framework.framework_output, + "Allocating GPU device buffer for aggregation\n"); ret = opal_accelerator.mem_alloc(MCA_ACCELERATOR_NO_DEVICE_ID, (void**)&aggr_data[i]->global_buf, bytes_per_cycle); if (OPAL_SUCCESS != ret) { @@ -513,10 +517,9 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_exch = MPI_Wtime(); #endif - } + } reqs = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*fh->f_num_aggrs *sizeof(ompi_request_t *)); - if ( NULL == reqs ) { opal_output (1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; @@ -629,7 +632,7 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, nentry.aggregator = 0; for ( i=0; if_num_aggrs; i++ ) { if (fh->f_aggr_list[i] == fh->f_rank) - nentry.aggregator = 1; + nentry.aggregator = 1; } nentry.nprocs_for_coll = fh->f_num_aggrs; if (!mca_common_ompio_full_print_queue(fh->f_coll_write_time)){ @@ -637,15 +640,13 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, nentry); } #endif - - + exit : - + if ( NULL != aggr_data ) { - - for ( i=0; i< fh->f_num_aggrs; i++ ) { + for ( i=0; i< fh->f_num_aggrs; i++ ) { if (fh->f_aggr_list[i] == fh->f_rank) { - if (NULL != aggr_data[i]->recvtype){ + if (NULL != aggr_data[i]->recvtype) { for (j =0; j< aggr_data[i]->procs_per_group; j++) { if ( MPI_DATATYPE_NULL != aggr_data[i]->recvtype[j] ) { ompi_datatype_destroy(&aggr_data[i]->recvtype[j]); @@ -653,26 +654,25 @@ exit : if ( MPI_DATATYPE_NULL != aggr_data[i]->prev_recvtype[j] ) { ompi_datatype_destroy(&aggr_data[i]->prev_recvtype[j]); } - } free(aggr_data[i]->recvtype); free(aggr_data[i]->prev_recvtype); } - + free (aggr_data[i]->disp_index); free (aggr_data[i]->max_disp_index); - if (use_accelerator_buffer) { - opal_accelerator.mem_release(MCA_ACCELERATOR_NO_DEVICE_ID, aggr_data[i]->global_buf); - opal_accelerator.mem_release(MCA_ACCELERATOR_NO_DEVICE_ID, aggr_data[i]->prev_global_buf); - } else { - free (aggr_data[i]->global_buf); - free (aggr_data[i]->prev_global_buf); - } + if (use_accelerator_buffer) { + opal_accelerator.mem_release(MCA_ACCELERATOR_NO_DEVICE_ID, aggr_data[i]->global_buf); + opal_accelerator.mem_release(MCA_ACCELERATOR_NO_DEVICE_ID, aggr_data[i]->prev_global_buf); + } else { + free (aggr_data[i]->global_buf); + free (aggr_data[i]->prev_global_buf); + } for(l=0;lprocs_per_group;l++){ free (aggr_data[i]->blocklen_per_process[l]); free (aggr_data[i]->displs_per_process[l]); } - + free (aggr_data[i]->blocklen_per_process); free (aggr_data[i]->displs_per_process); } @@ -680,7 +680,6 @@ exit : free (aggr_data[i]->global_iov_array); free (aggr_data[i]->fview_count); free (aggr_data[i]->decoded_iov); - free (aggr_data[i]); } free (aggr_data); @@ -704,7 +703,7 @@ exit : fh->f_aggr_list=NULL; free(result_counts); free(reqs); - + return OMPI_SUCCESS; } @@ -786,21 +785,16 @@ static int write_init (ompio_file_t *fh, return ret; } -static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, - ompi_request_t **reqs ) +static int shuffle_init (int index, int num_cycles, int aggregator, int rank, + mca_io_ompio_aggregator_data *data, ompi_request_t **reqs) { - int bytes_sent = 0; - int blocks=0, temp_pindex; - int i, j, l, ret; - int entries_per_aggregator=0; + size_t bytes_sent = 0; + int i, j, l; + int ret = OMPI_SUCCESS; + int entries_per_aggregator = 0; mca_io_ompio_local_io_array *file_offsets_for_agg=NULL; int *sorted_file_offsets=NULL; - int temp_index=0; MPI_Aint *memory_displacements=NULL; - int *temp_disp_index=NULL; -#if DEBUG_ON - MPI_Aint global_count = 0; -#endif int* blocklength_proc=NULL; ptrdiff_t* displs_proc=NULL; @@ -812,21 +806,19 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i *** 7a. Getting ready for next cycle: initializing and freeing buffers **********************************************************************/ if (aggregator == rank) { - if (NULL != data->recvtype){ for (i =0; i< data->procs_per_group; i++) { - if ( MPI_DATATYPE_NULL != data->recvtype[i] ) { + if (MPI_DATATYPE_NULL != data->recvtype[i]) { ompi_datatype_destroy(&data->recvtype[i]); data->recvtype[i] = MPI_DATATYPE_NULL; } } } - - for(l=0;lprocs_per_group;l++){ + for(l = 0; l < data->procs_per_group; l++){ data->disp_index[l] = 0; - - if ( data->max_disp_index[l] == 0 ) { + + if (data->max_disp_index[l] == 0) { data->blocklen_per_process[l] = (int *) calloc (INIT_LEN, sizeof(int)); data->displs_per_process[l] = (MPI_Aint *) calloc (INIT_LEN, sizeof(MPI_Aint)); if (NULL == data->displs_per_process[l] || NULL == data->blocklen_per_process[l]){ @@ -835,25 +827,22 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i goto exit; } data->max_disp_index[l] = INIT_LEN; - } - else { - memset ( data->blocklen_per_process[l], 0, data->max_disp_index[l]*sizeof(int) ); - memset ( data->displs_per_process[l], 0, data->max_disp_index[l]*sizeof(MPI_Aint) ); + } else { + memset (data->blocklen_per_process[l], 0, data->max_disp_index[l]*sizeof(int)); + memset (data->displs_per_process[l], 0, data->max_disp_index[l]*sizeof(MPI_Aint)); } } } /* (aggregator == rank */ - + /************************************************************************** *** 7b. Determine the number of bytes to be actually written in this cycle **************************************************************************/ int local_cycles= ceil((double)data->total_bytes / data->bytes_per_cycle); - if ( index < (local_cycles -1) ) { + if (index < (local_cycles -1)) { data->bytes_to_write_in_cycle = data->bytes_per_cycle; - } - else if ( index == (local_cycles -1)) { + } else if (index == (local_cycles -1)) { data->bytes_to_write_in_cycle = data->total_bytes - data->bytes_per_cycle*index ; - } - else { + } else { data->bytes_to_write_in_cycle = 0; } data->bytes_to_write = data->bytes_to_write_in_cycle; @@ -861,309 +850,57 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i #if DEBUG_ON if (aggregator == rank) { printf ("****%d: CYCLE %d Bytes %lld**********\n", - rank, - index, - data->bytes_to_write_in_cycle); + rank, index, data->bytes_to_write_in_cycle); } #endif - /********************************************************** - **Gather the Data from all the processes at the writers ** - *********************************************************/ - -#if DEBUG_ON - printf("bytes_to_write_in_cycle: %ld, cycle : %d\n", data->bytes_to_write_in_cycle, - index); -#endif - + /***************************************************************** *** 7c. Calculate how much data will be contributed in this cycle *** by each process *****************************************************************/ - - /* The blocklen and displs calculation only done at aggregators!*/ - while (data->bytes_to_write_in_cycle) { - - /* This next block identifies which process is the holder - ** of the sorted[current_index] element; - */ - blocks = data->fview_count[0]; - for (j=0 ; jprocs_per_group ; j++) { - if (data->sorted[data->current_index] < blocks) { - data->n = j; - break; - } - else { - blocks += data->fview_count[j+1]; - } - } + mca_fcoll_vulcan_calc_blocklen_disps (data, aggregator, rank, &bytes_sent); - if (data->bytes_remaining) { - /* Finish up a partially used buffer from the previous cycle */ - - if (data->bytes_remaining <= data->bytes_to_write_in_cycle) { - /* The data fits completely into the block */ - if (aggregator == rank) { - data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_remaining; - data->displs_per_process[data->n][data->disp_index[data->n]] = - (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base + - (data->global_iov_array[data->sorted[data->current_index]].iov_len - - data->bytes_remaining); - - data->disp_index[data->n] += 1; - - /* In this cases the length is consumed so allocating for - next displacement and blocklength*/ - if ( data->disp_index[data->n] == data->max_disp_index[data->n] ) { - data->max_disp_index[data->n] *= 2; - - data->blocklen_per_process[data->n] = (int *) realloc - ((void *)data->blocklen_per_process[data->n], - (data->max_disp_index[data->n])*sizeof(int)); - data->displs_per_process[data->n] = (MPI_Aint *) realloc - ((void *)data->displs_per_process[data->n], - (data->max_disp_index[data->n])*sizeof(MPI_Aint)); - } - data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0; - data->displs_per_process[data->n][data->disp_index[data->n]] = 0; - - } - if (data->procs_in_group[data->n] == rank) { - bytes_sent += data->bytes_remaining; - } - data->current_index ++; - data->bytes_to_write_in_cycle -= data->bytes_remaining; - data->bytes_remaining = 0; - } - else { - /* the remaining data from the previous cycle is larger than the - data->bytes_to_write_in_cycle, so we have to segment again */ - if (aggregator == rank) { - data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_to_write_in_cycle; - data->displs_per_process[data->n][data->disp_index[data->n]] = - (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base + - (data->global_iov_array[data->sorted[data->current_index]].iov_len - - data->bytes_remaining); - data->disp_index[data->n] += 1; - } - - if (data->procs_in_group[data->n] == rank) { - bytes_sent += data->bytes_to_write_in_cycle; - } - data->bytes_remaining -= data->bytes_to_write_in_cycle; - data->bytes_to_write_in_cycle = 0; - break; - } - } - else { - /* No partially used entry available, have to start a new one */ - if (data->bytes_to_write_in_cycle < - (MPI_Aint) data->global_iov_array[data->sorted[data->current_index]].iov_len) { - /* This entry has more data than we can sendin one cycle */ - if (aggregator == rank) { - data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_to_write_in_cycle; - data->displs_per_process[data->n][data->disp_index[data->n]] = - (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base ; - data->disp_index[data->n] += 1; - } - if (data->procs_in_group[data->n] == rank) { - bytes_sent += data->bytes_to_write_in_cycle; - - } - data->bytes_remaining = data->global_iov_array[data->sorted[data->current_index]].iov_len - - data->bytes_to_write_in_cycle; - data->bytes_to_write_in_cycle = 0; - break; - } - else { - /* Next data entry is less than data->bytes_to_write_in_cycle */ - if (aggregator == rank) { - data->blocklen_per_process[data->n][data->disp_index[data->n]] = - data->global_iov_array[data->sorted[data->current_index]].iov_len; - data->displs_per_process[data->n][data->disp_index[data->n]] = (ptrdiff_t) - data->global_iov_array[data->sorted[data->current_index]].iov_base; - - data->disp_index[data->n] += 1; - - /*realloc for next blocklength - and assign this displacement and check for next displs as - the total length of this entry has been consumed!*/ - if ( data->disp_index[data->n] == data->max_disp_index[data->n] ) { - data->max_disp_index[data->n] *=2 ; - data->blocklen_per_process[data->n] = (int *) realloc ( - (void *)data->blocklen_per_process[data->n], - (data->max_disp_index[data->n]*sizeof(int))); - data->displs_per_process[data->n] = (MPI_Aint *)realloc ( - (void *)data->displs_per_process[data->n], - (data->max_disp_index[data->n]*sizeof(MPI_Aint))); - } - data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0; - data->displs_per_process[data->n][data->disp_index[data->n]] = 0; - } - if (data->procs_in_group[data->n] == rank) { - bytes_sent += data->global_iov_array[data->sorted[data->current_index]].iov_len; - } - data->bytes_to_write_in_cycle -= - data->global_iov_array[data->sorted[data->current_index]].iov_len; - data->current_index ++; - } - } - } - - /************************************************************************* - *** 7d. Calculate the displacement on where to put the data and allocate - *** the receive buffer (global_buf) + *** 7d. Calculate the displacement on where to put the data *************************************************************************/ if (aggregator == rank) { entries_per_aggregator=0; - for (i=0;iprocs_per_group; i++){ - for (j=0;jdisp_index[i];j++){ - if (data->blocklen_per_process[i][j] > 0) + for (i = 0; i < data->procs_per_group; i++){ + for (j = 0; j < data->disp_index[i];j++){ + if (data->blocklen_per_process[i][j] > 0) { entries_per_aggregator++ ; + } } } - #if DEBUG_ON - printf("%d: cycle: %d, bytes_sent: %d\n ",rank,index, - bytes_sent); - printf("%d : Entries per aggregator : %d\n",rank,entries_per_aggregator); + printf("%d : Entries per aggregator : %d\n", rank, entries_per_aggregator); #endif - - if (entries_per_aggregator > 0){ - file_offsets_for_agg = (mca_io_ompio_local_io_array *) - malloc(entries_per_aggregator*sizeof(mca_io_ompio_local_io_array)); - if (NULL == file_offsets_for_agg) { + + if (entries_per_aggregator > 0) { + file_offsets_for_agg = (mca_io_ompio_local_io_array *) malloc(entries_per_aggregator * + sizeof(mca_io_ompio_local_io_array)); + memory_displacements = (MPI_Aint *) malloc (entries_per_aggregator * sizeof(MPI_Aint)); + sorted_file_offsets = (int *) malloc (entries_per_aggregator*sizeof(int)); + if (NULL == memory_displacements || NULL == file_offsets_for_agg || + NULL == sorted_file_offsets) { opal_output (1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; goto exit; } - - sorted_file_offsets = (int *) - malloc (entries_per_aggregator*sizeof(int)); - if (NULL == sorted_file_offsets){ - opal_output (1, "OUT OF MEMORY\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } - - /*Moving file offsets to an IO array!*/ - temp_index = 0; - - for (i=0;iprocs_per_group; i++){ - for(j=0;jdisp_index[i];j++){ - if (data->blocklen_per_process[i][j] > 0){ - file_offsets_for_agg[temp_index].length = - data->blocklen_per_process[i][j]; - file_offsets_for_agg[temp_index].process_id = i; - file_offsets_for_agg[temp_index].offset = - data->displs_per_process[i][j]; - temp_index++; - -#if DEBUG_ON - printf("************Cycle: %d, Aggregator: %d ***************\n", - index+1,rank); - - printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n", - data->procs_in_group[i],j, - data->blocklen_per_process[i][j],j, - data->displs_per_process[i][j], - rank); -#endif - } - } - } - - /* Sort the displacements for each aggregator*/ - local_heap_sort (file_offsets_for_agg, - entries_per_aggregator, - sorted_file_offsets); - - /*create contiguous memory displacements - based on blocklens on the same displs array - and map it to this aggregator's actual - file-displacements (this is in the io-array created above)*/ - memory_displacements = (MPI_Aint *) malloc - (entries_per_aggregator * sizeof(MPI_Aint)); - - memory_displacements[sorted_file_offsets[0]] = 0; - for (i=1; iprocs_per_group * sizeof (int)); - if (NULL == temp_disp_index) { - opal_output (1, "OUT OF MEMORY\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; + + ret = mca_fcoll_vulcan_calc_file_offsets(data, file_offsets_for_agg, sorted_file_offsets, + memory_displacements, entries_per_aggregator, + rank, index); + if (OMPI_SUCCESS != ret) { goto exit; } - - /*Now update the displacements array with memory offsets*/ -#if DEBUG_ON - global_count = 0; -#endif - for (i=0;idispls_per_process[temp_pindex][temp_disp_index[temp_pindex]] = - memory_displacements[sorted_file_offsets[i]]; - if (temp_disp_index[temp_pindex] < data->disp_index[temp_pindex]) - temp_disp_index[temp_pindex] += 1; - else{ - printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n", - temp_pindex, temp_disp_index[temp_pindex], - temp_pindex, data->disp_index[temp_pindex]); - } -#if DEBUG_ON - global_count += - file_offsets_for_agg[sorted_file_offsets[i]].length; -#endif - } - - if (NULL != temp_disp_index){ - free(temp_disp_index); - temp_disp_index = NULL; - } - -#if DEBUG_ON - - printf("************Cycle: %d, Aggregator: %d ***************\n", - index+1,rank); - for (i=0;iprocs_per_group; i++){ - for(j=0;jdisp_index[i];j++){ - if (data->blocklen_per_process[i][j] > 0){ - printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n", - data->procs_in_group[i],j, - data->blocklen_per_process[i][j],j, - data->displs_per_process[i][j], - rank); - - } - } - } - printf("************Cycle: %d, Aggregator: %d ***************\n", - index+1,rank); - for (i=0; iprocs_per_group; i++) { + for (i = 0; i < data->procs_per_group; i++) { size_t datatype_size; reqs[i] = MPI_REQUEST_NULL; - if ( 0 < data->disp_index[i] ) { + if (0 < data->disp_index[i]) { ompi_datatype_create_hindexed(data->disp_index[i], data->blocklen_per_process[i], data->displs_per_process[i], @@ -1172,7 +909,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i ompi_datatype_commit(&data->recvtype[i]); opal_datatype_type_size(&data->recvtype[i]->super, &datatype_size); - if (datatype_size){ + if (datatype_size) { ret = MCA_PML_CALL(irecv(data->global_buf, 1, data->recvtype[i], @@ -1211,8 +948,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i if(0 == block_index) { send_mem_address = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) + data->current_position; - } - else { + } else { // Reallocate more memory if blocklength_size is not enough if(0 == block_index % INIT_LEN) { blocklength_size += INIT_LEN; @@ -1223,17 +959,14 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i data->current_position - send_mem_address; } - if (remaining >= - (data->decoded_iov[data->iov_index].iov_len - data->current_position)) { - + if (remaining >= (data->decoded_iov[data->iov_index].iov_len - data->current_position)) { blocklength_proc[block_index] = data->decoded_iov[data->iov_index].iov_len - data->current_position; remaining = remaining - (data->decoded_iov[data->iov_index].iov_len - data->current_position); data->iov_index = data->iov_index + 1; data->current_position = 0; - } - else { + } else { blocklength_proc[block_index] = remaining; data->current_position += remaining; remaining = 0; @@ -1268,77 +1001,23 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i } } -#if DEBUG_ON - if (aggregator == rank){ - printf("************Cycle: %d, Aggregator: %d ***************\n", - index+1,rank); - for (i=0 ; iglobal_buf)[i]); - } -#endif - -//#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN -// end_comm_time = MPI_Wtime(); -// comm_time += (end_comm_time - start_comm_time); -//#endif /********************************************************** *** 7f. Create the io array, and pass it to fbtl *********************************************************/ - - if (aggregator == rank && entries_per_aggregator>0) { - - - data->io_array = (mca_common_ompio_io_array_t *) malloc - (entries_per_aggregator * sizeof (mca_common_ompio_io_array_t)); + if (aggregator == rank && entries_per_aggregator > 0) { + data->io_array = (mca_common_ompio_io_array_t *) malloc (entries_per_aggregator * + sizeof (mca_common_ompio_io_array_t)); if (NULL == data->io_array) { opal_output(1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; goto exit; } - - data->num_io_entries = 0; - /*First entry for every aggregator*/ - data->io_array[0].offset = - (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset; - data->io_array[0].length = - file_offsets_for_agg[sorted_file_offsets[0]].length; - data->io_array[0].memory_address = - data->global_buf+memory_displacements[sorted_file_offsets[0]]; - data->num_io_entries++; - - for (i=1;iio_array[data->num_io_entries - 1].length += - file_offsets_for_agg[sorted_file_offsets[i]].length; - } - else { - data->io_array[data->num_io_entries].offset = - (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[i]].offset; - data->io_array[data->num_io_entries].length = - file_offsets_for_agg[sorted_file_offsets[i]].length; - data->io_array[data->num_io_entries].memory_address = - data->global_buf+memory_displacements[sorted_file_offsets[i]]; - data->num_io_entries++; - } - - } - -#if DEBUG_ON - printf("*************************** %d\n", num_of_io_entries); - for (i=0 ; iio_array, &data->num_io_entries, entries_per_aggregator, + (char*)data->global_buf, file_offsets_for_agg, sorted_file_offsets, + memory_displacements, rank); } - + exit: free(sorted_file_offsets); free(file_offsets_for_agg); @@ -1346,10 +1025,10 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i free(blocklength_proc); free(displs_proc); - return OMPI_SUCCESS; + return ret; } -static int mca_fcoll_vulcan_minmax ( ompio_file_t *fh, struct iovec *iov, int iov_count, int num_aggregators, long *new_stripe_size) +int mca_fcoll_vulcan_minmax (ompio_file_t *fh, struct iovec *iov, int iov_count, int num_aggregators, long *new_stripe_size) { long min, max, globalmin, globalmax; long stripe_size; @@ -1363,10 +1042,10 @@ static int mca_fcoll_vulcan_minmax ( ompio_file_t *fh, struct iovec *iov, int io max = 0; } fh->f_comm->c_coll->coll_allreduce ( &min, &globalmin, 1, MPI_LONG, MPI_MIN, - fh->f_comm, fh->f_comm->c_coll->coll_allreduce_module); + fh->f_comm, fh->f_comm->c_coll->coll_allreduce_module); fh->f_comm->c_coll->coll_allreduce ( &max, &globalmax, 1, MPI_LONG, MPI_MAX, - fh->f_comm, fh->f_comm->c_coll->coll_allreduce_module); + fh->f_comm, fh->f_comm->c_coll->coll_allreduce_module); stripe_size = (globalmax - globalmin)/num_aggregators; if ( (globalmax - globalmin) % num_aggregators ) { @@ -1377,8 +1056,6 @@ static int mca_fcoll_vulcan_minmax ( ompio_file_t *fh, struct iovec *iov, int io return OMPI_SUCCESS; } - - int mca_fcoll_vulcan_break_file_view ( struct iovec *mem_iov, int mem_count, struct iovec *file_iov, int file_count, @@ -1627,10 +1304,270 @@ int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, size return ret; } - + +void mca_fcoll_vulcan_calc_blocklen_disps (mca_io_ompio_aggregator_data *data, int aggregator, + int rank, size_t *bytes_comm) +{ + size_t bytes_tmp = *bytes_comm; + int blocks = 0; + int j; + + /* The blocklen and displs calculation only done at aggregators */ + while (data->bytes_to_write_in_cycle) { + + /* This next block identifies which process is the holder + ** of the sorted[current_index] element; + */ + blocks = data->fview_count[0]; + for (j = 0 ; j < data->procs_per_group ; j++) { + if (data->sorted[data->current_index] < blocks) { + data->n = j; + break; + } else { + blocks += data->fview_count[j+1]; + } + } + + if (data->bytes_remaining) { + /* Finish up a partially used buffer from the previous cycle */ + + if (data->bytes_remaining <= data->bytes_to_write_in_cycle) { + /* The data fits completely into the block */ + if (aggregator == rank) { + data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_remaining; + data->displs_per_process[data->n][data->disp_index[data->n]] = + (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base + + (data->global_iov_array[data->sorted[data->current_index]].iov_len + - data->bytes_remaining); + + data->disp_index[data->n] += 1; + + /* In this cases the length is consumed so allocating for + next displacement and blocklength*/ + if (data->disp_index[data->n] == data->max_disp_index[data->n]) { + data->max_disp_index[data->n] *= 2; + + data->blocklen_per_process[data->n] = (int *) realloc + ((void *)data->blocklen_per_process[data->n], + (data->max_disp_index[data->n])*sizeof(int)); + data->displs_per_process[data->n] = (MPI_Aint *) realloc + ((void *)data->displs_per_process[data->n], + (data->max_disp_index[data->n])*sizeof(MPI_Aint)); + } + data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0; + data->displs_per_process[data->n][data->disp_index[data->n]] = 0; + } + if (data->procs_in_group[data->n] == rank) { + bytes_tmp += data->bytes_remaining; + } + data->current_index ++; + data->bytes_to_write_in_cycle -= data->bytes_remaining; + data->bytes_remaining = 0; + } else { + /* the remaining data from the previous cycle is larger than the + data->bytes_to_write_in_cycle, so we have to segment again */ + if (aggregator == rank) { + data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_to_write_in_cycle; + data->displs_per_process[data->n][data->disp_index[data->n]] = + (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base + + (data->global_iov_array[data->sorted[data->current_index]].iov_len + - data->bytes_remaining); + data->disp_index[data->n] += 1; + } + + if (data->procs_in_group[data->n] == rank) { + bytes_tmp += data->bytes_to_write_in_cycle; + } + data->bytes_remaining -= data->bytes_to_write_in_cycle; + data->bytes_to_write_in_cycle = 0; + break; + } + } else { + /* No partially used entry available, have to start a new one */ + if (data->bytes_to_write_in_cycle < + (MPI_Aint) data->global_iov_array[data->sorted[data->current_index]].iov_len) { + /* This entry has more data than we can sendin one cycle */ + if (aggregator == rank) { + data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_to_write_in_cycle; + data->displs_per_process[data->n][data->disp_index[data->n]] = + (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base ; + data->disp_index[data->n] += 1; + } + if (data->procs_in_group[data->n] == rank) { + bytes_tmp += data->bytes_to_write_in_cycle; + } + data->bytes_remaining = data->global_iov_array[data->sorted[data->current_index]].iov_len - + data->bytes_to_write_in_cycle; + data->bytes_to_write_in_cycle = 0; + break; + } else { + /* Next data entry is less than data->bytes_to_write_in_cycle */ + if (aggregator == rank) { + data->blocklen_per_process[data->n][data->disp_index[data->n]] = + data->global_iov_array[data->sorted[data->current_index]].iov_len; + data->displs_per_process[data->n][data->disp_index[data->n]] = (ptrdiff_t) + data->global_iov_array[data->sorted[data->current_index]].iov_base; + + data->disp_index[data->n] += 1; + + /* realloc for next blocklength and assign this displacement + ** and check for next displs as the total length of this entry + ** has been consumed */ + if (data->disp_index[data->n] == data->max_disp_index[data->n] ) { + data->max_disp_index[data->n] *=2 ; + data->blocklen_per_process[data->n] = (int *) realloc ( + (void *)data->blocklen_per_process[data->n], + (data->max_disp_index[data->n]*sizeof(int))); + data->displs_per_process[data->n] = (MPI_Aint *)realloc ( + (void *)data->displs_per_process[data->n], + (data->max_disp_index[data->n]*sizeof(MPI_Aint))); + } + data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0; + data->displs_per_process[data->n][data->disp_index[data->n]] = 0; + } + if (data->procs_in_group[data->n] == rank) { + bytes_tmp += data->global_iov_array[data->sorted[data->current_index]].iov_len; + } + data->bytes_to_write_in_cycle -= + data->global_iov_array[data->sorted[data->current_index]].iov_len; + data->current_index ++; + } + } + } + + *bytes_comm = bytes_tmp; +} + +int mca_fcoll_vulcan_calc_file_offsets(mca_io_ompio_aggregator_data *data, mca_io_ompio_local_io_array *file_offsets_for_agg, + int *sorted_file_offsets, MPI_Aint *memory_displacements, int entries_per_aggregator, + int rank, int index) +{ + int *temp_disp_index; + int temp_index = 0; + int temp_pindex; + int i, j; + + /* Moving file offsets to an IO array */ + for (i = 0; i < data->procs_per_group; i++){ + for(j = 0; j < data->disp_index[i];j++){ + if (data->blocklen_per_process[i][j] > 0){ + file_offsets_for_agg[temp_index].length = + data->blocklen_per_process[i][j]; + file_offsets_for_agg[temp_index].process_id = i; + file_offsets_for_agg[temp_index].offset = + data->displs_per_process[i][j]; + temp_index++; + } + } + } + + /* Sort the displacements for each aggregator */ + local_heap_sort (file_offsets_for_agg, entries_per_aggregator, + sorted_file_offsets); + + /* create contiguous memory displacements based on blocklens + ** on the same displs array and map it to this aggregator's actual + ** file-displacements */ + memory_displacements[sorted_file_offsets[0]] = 0; + for (i = 1; i < entries_per_aggregator; i++){ + memory_displacements[sorted_file_offsets[i]] = + memory_displacements[sorted_file_offsets[i-1]] + + file_offsets_for_agg[sorted_file_offsets[i-1]].length; + } + + temp_disp_index = (int *)calloc (1, data->procs_per_group * sizeof (int)); + if (NULL == temp_disp_index) { + opal_output (1, "OUT OF MEMORY\n"); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + /* Now update the displacements array with memory offsets */ + for (i = 0; i < entries_per_aggregator;i++) { + temp_pindex = file_offsets_for_agg[sorted_file_offsets[i]].process_id; + data->displs_per_process[temp_pindex][temp_disp_index[temp_pindex]] = + memory_displacements[sorted_file_offsets[i]]; + if (temp_disp_index[temp_pindex] < data->disp_index[temp_pindex]) { + temp_disp_index[temp_pindex] += 1; + } + else { + printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n", + temp_pindex, temp_disp_index[temp_pindex], + temp_pindex, data->disp_index[temp_pindex]); + } + } + + free(temp_disp_index); + +#if DEBUG_ON + printf("************Cycle: %d, Aggregator: %d ***************\n", + index+1, rank); + for (i = 0; i < data->procs_per_group; i++){ + for(j = 0; j < data->disp_index[i]; j++){ + if (data->blocklen_per_process[i][j] > 0){ + printf("%d communicate blocklen[%d]: %d, disp[%d]: %ld to %d\n", + data->procs_in_group[i],j, + data->blocklen_per_process[i][j],j, + data->displs_per_process[i][j], + rank); + } + } + } + printf("************Cycle: %d, Aggregator: %d ***************\n", + index+1, rank); + for (i = 0; i < entries_per_aggregator;i++){ + printf("%d: OFFSET: %lld LENGTH: %ld, Mem-offset: %ld\n", + file_offsets_for_agg[sorted_file_offsets[i]].process_id, + file_offsets_for_agg[sorted_file_offsets[i]].offset, + file_offsets_for_agg[sorted_file_offsets[i]].length, + memory_displacements[sorted_file_offsets[i]]); + } +#endif + + return OMPI_SUCCESS; +} + +void mca_fcoll_vulcan_calc_io_array(mca_common_ompio_io_array_t *io_array, int *num_io_entries, int max_io_entries, + char *global_buf, mca_io_ompio_local_io_array *file_offsets_for_agg, + int *sorted_offsets, MPI_Aint *memory_displacements, int rank) +{ + int i; + int num_entries; + + /* First entry for every aggregator */ + io_array[0].offset = (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_offsets[0]].offset; + io_array[0].length = file_offsets_for_agg[sorted_offsets[0]].length; + io_array[0].memory_address = global_buf + memory_displacements[sorted_offsets[0]]; + num_entries = 1; + + /* If the entries are contiguous merge them, else add a new entry */ + for (i = 1; i < max_io_entries; i++) { + if (file_offsets_for_agg[sorted_offsets[i-1]].offset + + file_offsets_for_agg[sorted_offsets[i-1]].length == + file_offsets_for_agg[sorted_offsets[i]].offset) { + io_array[num_entries - 1].length += file_offsets_for_agg[sorted_offsets[i]].length; + } else { + io_array[num_entries].offset = (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_offsets[i]].offset; + io_array[num_entries].length = file_offsets_for_agg[sorted_offsets[i]].length; + io_array[num_entries].memory_address = global_buf + memory_displacements[sorted_offsets[i]]; + num_entries++; + } + } + + *num_io_entries = num_entries; +#if DEBUG_ON + printf("*************************** %d\n", num_entries); + for (i = 0; i < num_entries; i++) { + printf(" AGGREGATOR %d ADDRESS: %p OFFSET: %ld LENGTH: %ld\n", + rank, io_array[i].memory_address, + (ptrdiff_t)io_array[i].offset, + io_array[i].length); + } +#endif +} + static int local_heap_sort (mca_io_ompio_local_io_array *io_array, - int num_entries, - int *sorted) + int num_entries, + int *sorted) { int i = 0; int j = 0; From 030ead1eec4ce8ad11b597e10faaff5515f5e904 Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Tue, 8 Oct 2024 13:33:00 -0500 Subject: [PATCH 3/3] fcoll/vulcan: add read_all implementation add an implementation of the read_all operation that uses the two-phase I/O algorithm using even partitioning, i.e. the same base idea that is used by the write_all operation of this component. In addition to using the 'correct' data partitioning approach for the component, the vulcan read_all implementation also adds some other features that were there for the write_all operations, but not for the (generic) read_all algorithm used by all components so far. Specifically, it can overlap the execution of the I/O phase and the communication phase. The algorithm can also use GPU buffers for aggregation. Signed-off-by: Edgar Gabriel --- ompi/mca/fcoll/vulcan/Makefile.am | 2 + ompi/mca/fcoll/vulcan/fcoll_vulcan.h | 1 + .../mca/fcoll/vulcan/fcoll_vulcan_component.c | 1 + .../fcoll/vulcan/fcoll_vulcan_file_read_all.c | 906 +++++++++++++++++- .../vulcan/fcoll_vulcan_file_write_all.c | 84 +- ompi/mca/fcoll/vulcan/fcoll_vulcan_internal.h | 93 ++ 6 files changed, 1008 insertions(+), 79 deletions(-) create mode 100644 ompi/mca/fcoll/vulcan/fcoll_vulcan_internal.h diff --git a/ompi/mca/fcoll/vulcan/Makefile.am b/ompi/mca/fcoll/vulcan/Makefile.am index e805880a661..c4680544abb 100644 --- a/ompi/mca/fcoll/vulcan/Makefile.am +++ b/ompi/mca/fcoll/vulcan/Makefile.am @@ -13,6 +13,7 @@ # Copyright (c) 2012 Cisco Systems, Inc. All rights reserved. # Copyright (c) 2018 Research Organization for Information Science # and Technology (RIST). All rights reserved. +# Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved. # $COPYRIGHT$ # # Additional copyrights may follow @@ -22,6 +23,7 @@ sources = \ fcoll_vulcan.h \ + fcoll_vulcan_internal.h \ fcoll_vulcan_module.c \ fcoll_vulcan_component.c \ fcoll_vulcan_file_read_all.c \ diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan.h b/ompi/mca/fcoll/vulcan/fcoll_vulcan.h index fc94b582cfa..3165a0b0797 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan.h +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan.h @@ -14,6 +14,7 @@ * and Technology (RIST). All rights reserved. * Copyright (c) 2024 Triad National Security, LLC. All rights * reserved. + * Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan_component.c b/ompi/mca/fcoll/vulcan/fcoll_vulcan_component.c index 18e59832ceb..5fc8254f164 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan_component.c +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan_component.c @@ -16,6 +16,7 @@ * reserved. * Copyright (c) 2024 Triad National Security, LLC. All rights * reserved. + * Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_read_all.c b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_read_all.c index c372e9f14b4..f6a492e621c 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_read_all.c +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_read_all.c @@ -1,3 +1,4 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology @@ -14,6 +15,7 @@ * and Technology (RIST). All rights reserved. * Copyright (c) 2024 Triad National Security, LLC. All rights * reserved. + * Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -23,10 +25,31 @@ #include "ompi_config.h" #include "fcoll_vulcan.h" +#include "fcoll_vulcan_internal.h" #include "mpi.h" +#include "ompi/constants.h" +#include "ompi/mca/fcoll/fcoll.h" +#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h" #include "ompi/mca/common/ompio/common_ompio.h" +#include "ompi/mca/common/ompio/common_ompio_buffer.h" +#include "ompi/mca/io/io.h" +#include "ompi/mca/common/ompio/common_ompio_request.h" +#include "math.h" +#include "ompi/mca/pml/pml.h" +#include "opal/mca/accelerator/accelerator.h" +#include +#define DEBUG_ON 0 +#define NOT_AGGR_INDEX -1 + +static int shuffle_init (int index, int cycles, int aggregator, int rank, + mca_io_ompio_aggregator_data *data, ompi_request_t **reqs); + +static int read_init (ompio_file_t *fh, int index, int cycles, int aggregator, int rank, + mca_io_ompio_aggregator_data *aggr_data, + int read_syncType, ompi_request_t **request, + bool is_accelerator_buffer); int mca_fcoll_vulcan_file_read_all (struct ompio_file_t *fh, void *buf, @@ -34,7 +57,888 @@ int mca_fcoll_vulcan_file_read_all (struct ompio_file_t *fh, struct ompi_datatype_t *datatype, ompi_status_public_t *status) { - return mca_common_ompio_base_file_read_all (fh, buf, count, datatype, status); + int index = 0; + int cycles = 0; + int ret =0, l, i, j, bytes_per_cycle; + uint32_t iov_count = 0; + struct iovec *decoded_iov = NULL; + struct iovec *local_iov_array=NULL; + uint32_t total_fview_count = 0; + int local_count = 0; + ompi_request_t **reqs = NULL; + ompi_request_t *req_iread = MPI_REQUEST_NULL; + ompi_request_t *req_tmp = MPI_REQUEST_NULL; + mca_io_ompio_aggregator_data **aggr_data=NULL; + + ptrdiff_t *displs = NULL; + int vulcan_num_io_procs; + size_t max_data = 0; + + struct iovec **broken_iov_arrays=NULL; + struct iovec **broken_decoded_iovs=NULL; + int *broken_counts=NULL; + int *broken_iov_counts=NULL; + MPI_Aint *broken_total_lengths=NULL; + + int aggr_index = NOT_AGGR_INDEX; + int read_sync_type = 2; + int *result_counts=NULL; + + ompi_count_array_t fview_count_desc; + ompi_disp_array_t displs_desc; + int is_gpu, is_managed; + bool use_accelerator_buffer = false; + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0; + double comm_time = 0.0, start_comm_time = 0.0, end_comm_time = 0.0; + double exch_read = 0.0, start_exch = 0.0, end_exch = 0.0; + mca_common_ompio_print_entry nentry; +#endif + + vulcan_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators")); + if (OMPI_ERR_MAX == vulcan_num_io_procs) { + ret = OMPI_ERROR; + goto exit; + } + bytes_per_cycle = fh->f_bytes_per_agg; + + if ((1 == mca_fcoll_vulcan_async_io) && (NULL == fh->f_fbtl->fbtl_ipreadv)) { + opal_output (1, "vulcan_read_all: fbtl Does NOT support ipreadv() (asynchronous read) \n"); + ret = MPI_ERR_UNSUPPORTED_OPERATION; + goto exit; + } + + mca_common_ompio_check_gpu_buf (fh, buf, &is_gpu, &is_managed); + if (is_gpu && !is_managed && + fh->f_get_mca_parameter_value ("use_accelerator_buffers", strlen("use_accelerator_buffers"))) { + use_accelerator_buffer = true; + } + /* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what + the user requested */ + bytes_per_cycle = bytes_per_cycle/2; + + /************************************************************************** + ** 1. Decode user buffer into an iovec + **************************************************************************/ + ret = mca_common_ompio_decode_datatype ((struct ompio_file_t *) fh, + datatype, count, buf, &max_data, + fh->f_mem_convertor, &decoded_iov, + &iov_count); + if (OMPI_SUCCESS != ret){ + goto exit; + } + + if (MPI_STATUS_IGNORE != status) { + status->_ucount = max_data; + } + + ret = mca_fcoll_vulcan_get_configuration (fh, vulcan_num_io_procs, max_data); + if (OMPI_SUCCESS != ret){ + goto exit; + } + opal_output_verbose(10, ompi_fcoll_base_framework.framework_output, + "Using %d aggregators for the read_all operation \n", fh->f_num_aggrs); + + aggr_data = (mca_io_ompio_aggregator_data **) malloc (fh->f_num_aggrs * + sizeof(mca_io_ompio_aggregator_data*)); + if (NULL == aggr_data) { + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + for (i = 0; i < fh->f_num_aggrs; i++) { + // At this point we know the number of aggregators. If there is a correlation between + // number of aggregators and number of IO nodes, we know how many aggr_data arrays we need + // to allocate. + aggr_data[i] = (mca_io_ompio_aggregator_data *) calloc (1, sizeof(mca_io_ompio_aggregator_data)); + aggr_data[i]->procs_per_group = fh->f_procs_per_group; + aggr_data[i]->procs_in_group = fh->f_procs_in_group; + aggr_data[i]->comm = fh->f_comm; + // Identify if the process is an aggregator. + // If so, aggr_index would be its index in "aggr_data" and "aggregators" arrays. + if (fh->f_aggr_list[i] == fh->f_rank) { + aggr_index = i; + } + } + + /********************************************************************* + *** 2. Generate the local offsets/lengths array corresponding to + *** this read operation + ********************************************************************/ + ret = fh->f_generate_current_file_view ((struct ompio_file_t *) fh, + max_data, &local_iov_array, + &local_count); + if (ret != OMPI_SUCCESS) { + goto exit; + } + + /************************************************************************* + ** 2b. Separate the local_iov_array entries based on the number of aggregators + *************************************************************************/ + // Modifications for the even distribution: + long domain_size; + ret = mca_fcoll_vulcan_minmax (fh, local_iov_array, local_count, fh->f_num_aggrs, &domain_size); + + // broken_iov_arrays[0] contains broken_counts[0] entries to aggregator 0, + // broken_iov_arrays[1] contains broken_counts[1] entries to aggregator 1, etc. + ret = mca_fcoll_vulcan_break_file_view (decoded_iov, iov_count, + local_iov_array, local_count, + &broken_decoded_iovs, &broken_iov_counts, + &broken_iov_arrays, &broken_counts, + &broken_total_lengths, + fh->f_num_aggrs, domain_size); + + /************************************************************************** + ** 3. Determine the total amount of data to be read and no. of cycles + **************************************************************************/ +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_comm_time = MPI_Wtime(); +#endif + ret = fh->f_comm->c_coll->coll_allreduce (MPI_IN_PLACE, broken_total_lengths, + fh->f_num_aggrs, MPI_LONG, MPI_SUM, + fh->f_comm, + fh->f_comm->c_coll->coll_allreduce_module); + if (OMPI_SUCCESS != ret) { + goto exit; + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_comm_time = MPI_Wtime(); + comm_time += (end_comm_time - start_comm_time); +#endif + + cycles=0; + for (i = 0; i < fh->f_num_aggrs; i++) { +#if DEBUG_ON + printf("%d: Overall broken_total_lengths[%d] = %ld\n", fh->f_rank, i, broken_total_lengths[i]); +#endif + if (ceil((double)broken_total_lengths[i]/bytes_per_cycle) > cycles) { + cycles = ceil((double)broken_total_lengths[i]/bytes_per_cycle); + } + } + + result_counts = (int *) malloc (fh->f_num_aggrs * fh->f_procs_per_group * sizeof(int)); + if (NULL == result_counts) { + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_comm_time = MPI_Wtime(); +#endif + ret = fh->f_comm->c_coll->coll_allgather (broken_counts, fh->f_num_aggrs, MPI_INT, + result_counts, fh->f_num_aggrs, MPI_INT, + fh->f_comm, + fh->f_comm->c_coll->coll_allgather_module); + if (OMPI_SUCCESS != ret) { + goto exit; + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_comm_time = MPI_Wtime(); + comm_time += (end_comm_time - start_comm_time); +#endif + + /************************************************************* + *** 4. Allgather the offset/lengths array from all processes + *************************************************************/ + for (i = 0; i < fh->f_num_aggrs; i++) { + aggr_data[i]->total_bytes = broken_total_lengths[i]; + aggr_data[i]->decoded_iov = broken_decoded_iovs[i]; + aggr_data[i]->fview_count = (size_t *)malloc (fh->f_procs_per_group * sizeof (size_t)); + if (NULL == aggr_data[i]->fview_count) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + for (j = 0; j < fh->f_procs_per_group; j++) { + aggr_data[i]->fview_count[j] = result_counts[fh->f_num_aggrs*j+i]; + } + + displs = (ptrdiff_t *)malloc (fh->f_procs_per_group * sizeof (ptrdiff_t)); + if (NULL == displs) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + displs[0] = 0; + total_fview_count = (uint32_t) aggr_data[i]->fview_count[0]; + for (j = 1 ; j < fh->f_procs_per_group ; j++) { + total_fview_count += aggr_data[i]->fview_count[j]; + displs[j] = displs[j-1] + aggr_data[i]->fview_count[j-1]; + } + +#if DEBUG_ON + printf("total_fview_count : %d\n", total_fview_count); + if (fh->f_aggr_list[i] == fh->f_rank) { + for (j=0 ; jf_procs_per_group ; i++) { + printf ("%d: PROCESS: %d ELEMENTS: %ld DISPLS: %ld\n", + fh->f_rank, j, + aggr_data[i]->fview_count[j], + displs[j]); + } + } +#endif + + /* allocate the global iovec */ + if (0 != total_fview_count) { + aggr_data[i]->global_iov_array = (struct iovec*) malloc (total_fview_count * + sizeof(struct iovec)); + if (NULL == aggr_data[i]->global_iov_array) { + opal_output(1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_comm_time = MPI_Wtime(); +#endif + OMPI_COUNT_ARRAY_INIT(&fview_count_desc, aggr_data[i]->fview_count); + OMPI_DISP_ARRAY_INIT(&displs_desc, displs); + ret = fh->f_comm->c_coll->coll_allgatherv (broken_iov_arrays[i], + broken_counts[i], + fh->f_iov_type, + aggr_data[i]->global_iov_array, + fview_count_desc, + displs_desc, + fh->f_iov_type, + fh->f_comm, + fh->f_comm->c_coll->coll_allgatherv_module ); + if (OMPI_SUCCESS != ret) { + goto exit; + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_comm_time = MPI_Wtime(); + comm_time += (end_comm_time - start_comm_time); +#endif + + /**************************************************************************************** + *** 5. Sort the global offset/lengths list based on the offsets. + *** The result of the sort operation is the 'sorted', an integer array, + *** which contains the indexes of the global_iov_array based on the offset. + *** For example, if global_iov_array[x].offset is followed by global_iov_array[y].offset + *** in the file, and that one is followed by global_iov_array[z].offset, than + *** sorted[0] = x, sorted[1]=y and sorted[2]=z; + ******************************************************************************************/ + if (0 != total_fview_count) { + aggr_data[i]->sorted = (int *)malloc (total_fview_count * sizeof(int)); + if (NULL == aggr_data[i]->sorted) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + ompi_fcoll_base_sort_iovec (aggr_data[i]->global_iov_array, total_fview_count, + aggr_data[i]->sorted); + } + + if (NULL != local_iov_array) { + free(local_iov_array); + local_iov_array = NULL; + } + + if (NULL != displs) { + free(displs); + displs=NULL; + } + +#if DEBUG_ON + if (fh->f_aggr_list[i] == fh->f_rank) { + uint32_t tv=0; + for (tv = 0 ; tv < total_fview_count ; tv++) { + printf("%d: OFFSET: %lu LENGTH: %ld\n", + fh->f_rank, + (uint64_t)aggr_data[i]->global_iov_array[aggr_data[i]->sorted[tv]].iov_base, + aggr_data[i]->global_iov_array[aggr_data[i]->sorted[tv]].iov_len); + } + } +#endif + /************************************************************* + *** 6. Determine the number of cycles required to execute this + *** operation + *************************************************************/ + aggr_data[i]->bytes_per_cycle = bytes_per_cycle; + + if (fh->f_aggr_list[i] == fh->f_rank) { + aggr_data[i]->disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int)); + if (NULL == aggr_data[i]->disp_index) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + aggr_data[i]->max_disp_index = (int *)calloc (fh->f_procs_per_group, sizeof (int)); + if (NULL == aggr_data[i]->max_disp_index) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + aggr_data[i]->blocklen_per_process = (int **)calloc (fh->f_procs_per_group, sizeof (int*)); + if (NULL == aggr_data[i]->blocklen_per_process) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + aggr_data[i]->displs_per_process = (MPI_Aint **)calloc (fh->f_procs_per_group, sizeof (MPI_Aint*)); + if (NULL == aggr_data[i]->displs_per_process) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + if (use_accelerator_buffer) { + opal_output_verbose(10, ompi_fcoll_base_framework.framework_output, + "Allocating GPU device buffer for aggregation\n"); + ret = opal_accelerator.mem_alloc(MCA_ACCELERATOR_NO_DEVICE_ID, (void**)&aggr_data[i]->global_buf, + bytes_per_cycle); + if (OPAL_SUCCESS != ret) { + opal_output(1, "Could not allocate accelerator memory"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + ret = opal_accelerator.mem_alloc(MCA_ACCELERATOR_NO_DEVICE_ID, (void**)&aggr_data[i]->prev_global_buf, + bytes_per_cycle); + if (OPAL_SUCCESS != ret) { + opal_output(1, "Could not allocate accelerator memory"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + } else { + aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle); + aggr_data[i]->prev_global_buf = (char *) malloc (bytes_per_cycle); + if (NULL == aggr_data[i]->global_buf || NULL == aggr_data[i]->prev_global_buf){ + opal_output(1, "OUT OF MEMORY"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + } + + aggr_data[i]->recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * + sizeof(ompi_datatype_t *)); + aggr_data[i]->prev_recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * + sizeof(ompi_datatype_t *)); + if (NULL == aggr_data[i]->recvtype || NULL == aggr_data[i]->prev_recvtype) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + for(l=0;lf_procs_per_group;l++){ + aggr_data[i]->recvtype[l] = MPI_DATATYPE_NULL; + aggr_data[i]->prev_recvtype[l] = MPI_DATATYPE_NULL; + } + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_exch = MPI_Wtime(); +#endif + } + + reqs = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*fh->f_num_aggrs *sizeof(ompi_request_t *)); + if (NULL == reqs) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + for (l = 0, i = 0; i < fh->f_num_aggrs; i++) { + for (j=0; j< (fh->f_procs_per_group+1); j++) { + reqs[l] = MPI_REQUEST_NULL; + l++; + } + } + + if( (1 == mca_fcoll_vulcan_async_io) || + ( (0 == mca_fcoll_vulcan_async_io) && (NULL != fh->f_fbtl->fbtl_ipreadv) && (2 < cycles))) { + read_sync_type = 1; + } + + if (cycles > 0) { + if (NOT_AGGR_INDEX != aggr_index) { + // Register progress function that should be used by ompi_request_wait + mca_common_ompio_register_progress (); + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_read_time = MPI_Wtime(); +#endif + for (i = 0; i < fh->f_num_aggrs; i++) { + ret = read_init (fh, 0, cycles, fh->f_aggr_list[i], fh->f_rank, + aggr_data[i], read_sync_type, &req_tmp, + use_accelerator_buffer); + if (OMPI_SUCCESS != ret) { + goto exit; + } + if (fh->f_aggr_list[i] == fh->f_rank) { + req_iread = req_tmp; + } + } + + if (NOT_AGGR_INDEX != aggr_index) { + ret = ompi_request_wait(&req_iread, MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != ret){ + goto exit; + } + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_read_time = MPI_Wtime(); + read_time += end_read_time - start_read_time; +#endif + } + + for (index = 1; index < cycles; index++) { + for (i = 0; i < fh->f_num_aggrs; i++) { + ret = shuffle_init (index-1, cycles, fh->f_aggr_list[i], fh->f_rank, aggr_data[i], + &reqs[i*(fh->f_procs_per_group + 1)] ); + if (OMPI_SUCCESS != ret) { + goto exit; + } + } + + SWAP_AGGR_POINTERS(aggr_data, fh->f_num_aggrs); +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_read_time = MPI_Wtime(); +#endif + for (i = 0; i < fh->f_num_aggrs; i++) { + ret = read_init (fh, index, cycles, fh->f_aggr_list[i], fh->f_rank, + aggr_data[i], read_sync_type, + &req_tmp, use_accelerator_buffer); + if (OMPI_SUCCESS != ret){ + goto exit; + } + if (fh->f_aggr_list[i] == fh->f_rank) { + req_iread = req_tmp; + } + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_read_time = MPI_Wtime(); + read_time += end_read_time - start_read_time; +#endif + ret = ompi_request_wait_all ((fh->f_procs_per_group + 1 )*fh->f_num_aggrs, + reqs, MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != ret){ + goto exit; + } + + if (NOT_AGGR_INDEX != aggr_index) { + ret = ompi_request_wait (&req_iread, MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != ret){ + goto exit; + } + } + } /* end for (index = 1; index < cycles; index++) */ + + if (cycles > 0) { + for (i = 0; i < fh->f_num_aggrs; i++) { + ret = shuffle_init (index-1, cycles, fh->f_aggr_list[i], fh->f_rank, aggr_data[i], + &reqs[i*(fh->f_procs_per_group + 1)] ); + if (OMPI_SUCCESS != ret) { + goto exit; + } + } + ret = ompi_request_wait_all ((fh->f_procs_per_group + 1 )*fh->f_num_aggrs, + reqs, MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != ret){ + goto exit; + } + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_exch = MPI_Wtime(); + exch_read += end_exch - start_exch; + nentry.time[0] = read_time; + nentry.time[1] = comm_time; + nentry.time[2] = exch_read; + nentry.aggregator = 0; + for ( i=0; if_num_aggrs; i++ ) { + if (fh->f_aggr_list[i] == fh->f_rank) + nentry.aggregator = 1; + } + nentry.nprocs_for_coll = fh->f_num_aggrs; + if (!mca_common_ompio_full_print_queue(fh->f_coll_read_time)){ + mca_common_ompio_register_print_entry(fh->f_coll_read_time, + nentry); + } +#endif + +exit : + if (NULL != aggr_data) { + + for (i = 0; i < fh->f_num_aggrs; i++) { + if (fh->f_aggr_list[i] == fh->f_rank) { + if (NULL != aggr_data[i]->recvtype){ + for (j = 0; j < aggr_data[i]->procs_per_group; j++) { + if (MPI_DATATYPE_NULL != aggr_data[i]->recvtype[j]) { + ompi_datatype_destroy(&aggr_data[i]->recvtype[j]); + } + if (MPI_DATATYPE_NULL != aggr_data[i]->prev_recvtype[j]) { + ompi_datatype_destroy(&aggr_data[i]->prev_recvtype[j]); + } + } + free(aggr_data[i]->recvtype); + free(aggr_data[i]->prev_recvtype); + } + + free (aggr_data[i]->disp_index); + free (aggr_data[i]->max_disp_index); + if (use_accelerator_buffer) { + opal_accelerator.mem_release(MCA_ACCELERATOR_NO_DEVICE_ID, aggr_data[i]->global_buf); + opal_accelerator.mem_release(MCA_ACCELERATOR_NO_DEVICE_ID, aggr_data[i]->prev_global_buf); + } else { + free (aggr_data[i]->global_buf); + free (aggr_data[i]->prev_global_buf); + } + for (l = 0;l < aggr_data[i]->procs_per_group; l++) { + free (aggr_data[i]->blocklen_per_process[l]); + free (aggr_data[i]->displs_per_process[l]); + } + + free (aggr_data[i]->blocklen_per_process); + free (aggr_data[i]->displs_per_process); + } + free (aggr_data[i]->sorted); + free (aggr_data[i]->global_iov_array); + free (aggr_data[i]->fview_count); + free (aggr_data[i]->decoded_iov); + + free (aggr_data[i]); + } + free (aggr_data); + } + free(displs); + free(decoded_iov); + free(broken_counts); + free(broken_total_lengths); + free(broken_iov_counts); + free(broken_decoded_iovs); // decoded_iov arrays[i] were freed as aggr_data[i]->decoded_iov; + if (NULL != broken_iov_arrays) { + for (i = 0; i < fh->f_num_aggrs; i++) { + free(broken_iov_arrays[i]); + } + } + free(broken_iov_arrays); + free(fh->f_procs_in_group); + free(fh->f_aggr_list); + fh->f_procs_in_group=NULL; + fh->f_procs_per_group=0; + fh->f_aggr_list=NULL; + free(result_counts); + free(reqs); + + return ret; +} + +static int read_init (ompio_file_t *fh, int index, int cycles, int aggregator, int rank, + mca_io_ompio_aggregator_data *data, + int read_syncType, ompi_request_t **request, + bool is_accelerator_buffer) +{ + int ret = OMPI_SUCCESS; + ssize_t ret_temp = 0; + mca_ompio_request_t *ompio_req = NULL; + int i, j, l; + int entries_per_aggregator=0; + mca_io_ompio_local_io_array *file_offsets_for_agg=NULL; + MPI_Aint *memory_displacements=NULL; + int* blocklength_proc=NULL; + ptrdiff_t* displs_proc=NULL; + int *sorted_file_offsets=NULL; + + /********************************************************************** + *** 7a. Getting ready for next cycle: initializing and freeing buffers + **********************************************************************/ + data->bytes_sent = 0; + + if (aggregator == rank) { + if (NULL != data->recvtype){ + for (i = 0; i < data->procs_per_group; i++) { + if (MPI_DATATYPE_NULL != data->recvtype[i]) { + ompi_datatype_destroy(&data->recvtype[i]); + data->recvtype[i] = MPI_DATATYPE_NULL; + } + } + } + + for (l = 0; l < data->procs_per_group; l++) { + data->disp_index[l] = 0; + + if (data->max_disp_index[l] == 0) { + data->blocklen_per_process[l] = (int *) calloc (INIT_LEN, sizeof(int)); + data->displs_per_process[l] = (MPI_Aint *) calloc (INIT_LEN, sizeof(MPI_Aint)); + if (NULL == data->displs_per_process[l] || NULL == data->blocklen_per_process[l]){ + opal_output (1, "OUT OF MEMORY for displs\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + data->max_disp_index[l] = INIT_LEN; + } else { + memset (data->blocklen_per_process[l], 0, data->max_disp_index[l]*sizeof(int)); + memset (data->displs_per_process[l], 0, data->max_disp_index[l]*sizeof(MPI_Aint)); + } + } + } /* rank == aggregator */ + + /************************************************************************** + *** 7b. Determine the number of bytes to be actually read in this cycle + **************************************************************************/ + int local_cycles= ceil((double)data->total_bytes / data->bytes_per_cycle); + if (index < (local_cycles -1)) { + data->bytes_to_write_in_cycle = data->bytes_per_cycle; + } else if ( index == (local_cycles -1)) { + data->bytes_to_write_in_cycle = data->total_bytes - data->bytes_per_cycle*index; + } else { + data->bytes_to_write_in_cycle = 0; + } + data->bytes_to_write = data->bytes_to_write_in_cycle; + +#if DEBUG_ON + if (aggregator == rank) { + printf ("****%d: CYCLE %d Bytes %d**********\n", + rank, index, data->bytes_to_write_in_cycle); + } +#endif + + /***************************************************************** + *** 7c. Calculate how much data will be sent to each process in + *** this cycle + *****************************************************************/ + mca_fcoll_vulcan_calc_blocklen_disps(data, aggregator, rank, &data->bytes_sent); + + /************************************************************************* + *** 7d. Calculate the displacement + *************************************************************************/ + if (rank == aggregator) { + for (i = 0; i < data->procs_per_group; i++){ + for (j = 0; j < data->disp_index[i]; j++){ + if (data->blocklen_per_process[i][j] > 0) + entries_per_aggregator++ ; + } + } + } +#if DEBUG_ON + if (aggregator == rank) { + printf("%d : Entries per aggregator : %d\n", rank, entries_per_aggregator); + } +#endif + + if (entries_per_aggregator > 0) { + file_offsets_for_agg = (mca_io_ompio_local_io_array *) malloc (entries_per_aggregator + * sizeof(mca_io_ompio_local_io_array)); + memory_displacements = (MPI_Aint *) malloc (entries_per_aggregator * sizeof(MPI_Aint)); + sorted_file_offsets = (int *) malloc (entries_per_aggregator*sizeof(int)); + if (NULL == file_offsets_for_agg || NULL == memory_displacements || + NULL == sorted_file_offsets) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + ret = mca_fcoll_vulcan_calc_file_offsets(data, file_offsets_for_agg, sorted_file_offsets, + memory_displacements, entries_per_aggregator, + rank, index); + if (OMPI_SUCCESS != ret) { + goto exit; + } + + /********************************************************** + *** 7f. Create the io array + *********************************************************/ + fh->f_io_array = (mca_common_ompio_io_array_t *) malloc (entries_per_aggregator + * sizeof (mca_common_ompio_io_array_t)); + if (NULL == fh->f_io_array) { + opal_output(1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + mca_fcoll_vulcan_calc_io_array(fh->f_io_array, &fh->f_num_of_io_entries, entries_per_aggregator, + (char*)data->global_buf, file_offsets_for_agg, sorted_file_offsets, + memory_displacements, rank); + } + + if (rank == aggregator && fh->f_num_of_io_entries) { + mca_common_ompio_request_alloc (&ompio_req, MCA_OMPIO_REQUEST_READ); + + if (1 == read_syncType) { + if (is_accelerator_buffer) { + ret = mca_common_ompio_file_iread_pregen(fh, (ompi_request_t *) ompio_req); + if (0 > ret) { + opal_output (1, "vulcan_read_all: mca_common_ompio_iread_pregen failed\n"); + ompio_req->req_ompi.req_status.MPI_ERROR = ret; + ompio_req->req_ompi.req_status._ucount = 0; + } + } else { + ret = fh->f_fbtl->fbtl_ipreadv(fh, (ompi_request_t *) ompio_req); + if (0 > ret) { + opal_output (1, "vulcan_read_all: fbtl_ipreadv failed\n"); + ompio_req->req_ompi.req_status.MPI_ERROR = ret; + ompio_req->req_ompi.req_status._ucount = 0; + } + } + } + else { + ret_temp = fh->f_fbtl->fbtl_preadv(fh); + if (0 > ret_temp) { + opal_output (1, "vulcan_read_all: fbtl_preadv failed\n"); + ret = ret_temp; + ret_temp = 0; + } + + ompio_req->req_ompi.req_status.MPI_ERROR = ret; + ompio_req->req_ompi.req_status._ucount = ret_temp; + ompi_request_complete (&ompio_req->req_ompi, false); + } + + free(fh->f_io_array); + } + +#if DEBUG_ON + printf("************Cycle: %d, Aggregator: %d ***************\n", + index, rank); + for (i = 0; i < data->procs_per_group; i++) { + for (j = 0; j < data->disp_index[i]; j++) { + if (data->blocklen_per_process[i][j] > 0) { + printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n", + data->procs_in_group[i],j, + data->blocklen_per_process[i][j],j, + data->displs_per_process[i][j], rank); + } + } + } +#endif + +exit: + free(sorted_file_offsets); + free(file_offsets_for_agg); + free(memory_displacements); + free(blocklength_proc); + free(displs_proc); + + fh->f_io_array = NULL; + fh->f_num_of_io_entries = 0; + + *request = (ompi_request_t *) ompio_req; + return ret; } +static int shuffle_init (int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, + ompi_request_t **reqs) +{ + int i, ret = OMPI_SUCCESS; + int* blocklength_proc=NULL; + ptrdiff_t* displs_proc=NULL; + + /************************************************************************* + *** 7e. Perform the actual communication + *************************************************************************/ + if (aggregator == rank ) { + for (i = 0; i < data->procs_per_group; i++) { + size_t datatype_size; + reqs[i] = MPI_REQUEST_NULL; + if (0 < data->disp_index[i]) { + ompi_datatype_create_hindexed (data->disp_index[i], + data->blocklen_per_process[i], + data->displs_per_process[i], + MPI_BYTE, + &data->recvtype[i]); + ompi_datatype_commit (&data->recvtype[i]); + opal_datatype_type_size (&data->recvtype[i]->super, &datatype_size); + if (datatype_size){ + ret = MCA_PML_CALL(isend(data->global_buf, + 1, data->recvtype[i], + data->procs_in_group[i], + FCOLL_VULCAN_SHUFFLE_TAG+index, + MCA_PML_BASE_SEND_STANDARD, + data->comm, &reqs[i])); + if (OMPI_SUCCESS != ret){ + goto exit; + } + } + } + } + // } /* end if (entries_per_aggr > 0 ) */ + }/* end if (aggregator == rank ) */ + + reqs[data->procs_per_group] = MPI_REQUEST_NULL; + if (data->bytes_sent) { + size_t remaining = data->bytes_sent; + int block_index = -1; + int blocklength_size = INIT_LEN; + + ptrdiff_t recv_mem_address = 0; + ompi_datatype_t *newType = MPI_DATATYPE_NULL; + blocklength_proc = (int *) calloc (blocklength_size, sizeof (int)); + displs_proc = (ptrdiff_t *) calloc (blocklength_size, sizeof (ptrdiff_t)); + + if (NULL == blocklength_proc || NULL == displs_proc ) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + while (remaining) { + block_index++; + + if(0 == block_index) { + recv_mem_address = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) + + data->current_position; + } + else { + // Reallocate more memory if blocklength_size is not enough + if(0 == block_index % INIT_LEN) { + blocklength_size += INIT_LEN; + blocklength_proc = (int *) realloc(blocklength_proc, blocklength_size * sizeof(int)); + displs_proc = (ptrdiff_t *) realloc(displs_proc, blocklength_size * sizeof(ptrdiff_t)); + } + displs_proc[block_index] = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) + + data->current_position - recv_mem_address; + } + + if (remaining >= + (data->decoded_iov[data->iov_index].iov_len - data->current_position)) { + + blocklength_proc[block_index] = data->decoded_iov[data->iov_index].iov_len - + data->current_position; + remaining = remaining - (data->decoded_iov[data->iov_index].iov_len - + data->current_position); + data->iov_index = data->iov_index + 1; + data->current_position = 0; + } else { + blocklength_proc[block_index] = remaining; + data->current_position += remaining; + remaining = 0; + } + } + + data->total_bytes_written += data->bytes_sent; + + if (0 <= block_index) { + ompi_datatype_create_hindexed (block_index+1, + blocklength_proc, + displs_proc, + MPI_BYTE, + &newType); + ompi_datatype_commit (&newType); + + ret = MCA_PML_CALL(irecv((char *)recv_mem_address, + 1, + newType, + aggregator, + FCOLL_VULCAN_SHUFFLE_TAG+index, + data->comm, + &reqs[data->procs_per_group])); + if (MPI_DATATYPE_NULL != newType) { + ompi_datatype_destroy(&newType); + } + if (OMPI_SUCCESS != ret){ + goto exit; + } + } + } +exit: + return ret; +} diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c index 3748948bea5..b6e9be6d2ca 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c @@ -26,6 +26,7 @@ #include "ompi_config.h" #include "fcoll_vulcan.h" +#include "fcoll_vulcan_internal.h" #include "mpi.h" #include "ompi/constants.h" @@ -43,88 +44,16 @@ #define DEBUG_ON 0 #define NOT_AGGR_INDEX -1 -/*Used for loading file-offsets per aggregator*/ -typedef struct mca_io_ompio_local_io_array{ - OMPI_MPI_OFFSET_TYPE offset; - MPI_Aint length; - int process_id; -}mca_io_ompio_local_io_array; - -typedef struct mca_io_ompio_aggregator_data { - int *disp_index, *sorted, n; - size_t *fview_count; - int *max_disp_index; - int **blocklen_per_process; - MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written; - MPI_Comm comm; - char *global_buf, *prev_global_buf; - ompi_datatype_t **recvtype, **prev_recvtype; - struct iovec *global_iov_array; - int current_index, current_position; - int bytes_to_write_in_cycle, bytes_remaining, procs_per_group; - int *procs_in_group, iov_index; - int bytes_sent, prev_bytes_sent; - struct iovec *decoded_iov; - int bytes_to_write, prev_bytes_to_write; - mca_common_ompio_io_array_t *io_array, *prev_io_array; - int num_io_entries, prev_num_io_entries; -} mca_io_ompio_aggregator_data; - - -#define SWAP_REQUESTS(_r1,_r2) { \ - ompi_request_t **_t=_r1; \ - _r1=_r2; \ - _r2=_t;} - -#define SWAP_AGGR_POINTERS(_aggr,_num) { \ - int _i; \ - char *_t; \ - for (_i=0; _i<_num; _i++ ) { \ - _aggr[_i]->prev_io_array=_aggr[_i]->io_array; \ - _aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \ - _aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent; \ - _aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \ - _t=_aggr[_i]->prev_global_buf; \ - _aggr[_i]->prev_global_buf=_aggr[_i]->global_buf; \ - _aggr[_i]->global_buf=_t; \ - _t=(char *)_aggr[_i]->recvtype; \ - _aggr[_i]->recvtype=_aggr[_i]->prev_recvtype; \ - _aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t; } \ -} static int shuffle_init (int index, int num_cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, ompi_request_t **reqs); + static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_syncType, ompi_request_t **request, bool is_accelerator_buffer); -int mca_fcoll_vulcan_break_file_view (struct iovec *decoded_iov, int iov_count, - struct iovec *local_iov_array, int local_count, - struct iovec ***broken_decoded_iovs, int **broken_iov_counts, - struct iovec ***broken_iov_arrays, int **broken_counts, - MPI_Aint **broken_total_lengths, - int stripe_count, size_t stripe_size); - -int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, - size_t max_data); - static int local_heap_sort (mca_io_ompio_local_io_array *io_array, - int num_entries, int *sorted); - -int mca_fcoll_vulcan_minmax (ompio_file_t *fh, struct iovec *iov, int iov_count, int num_aggregators, - long *new_stripe_size); - -void mca_fcoll_vulcan_calc_blocklen_disps (mca_io_ompio_aggregator_data *data, int aggregator, - int rank, size_t *bytes_comm); - -int mca_fcoll_vulcan_calc_file_offsets(mca_io_ompio_aggregator_data *data, - mca_io_ompio_local_io_array *file_offsets_for_agg, - int *sorted_file_offsets, MPI_Aint *memory_displacements, - int entries_per_aggregator, int rank, int index); - -void mca_fcoll_vulcan_calc_io_array(mca_common_ompio_io_array_t *io_array, int *num_io_entries, int max_io_arrays, - char *global_buf, mca_io_ompio_local_io_array *file_offsets_for_agg, - int *sorted_offsets, MPI_Aint *memory_displacements, int rank); + int num_entries, int *sorted); int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, const void *buf, @@ -216,6 +145,8 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, if (OMPI_SUCCESS != ret){ goto exit; } + opal_output_verbose(10, ompi_fcoll_base_framework.framework_output, + "Using %d aggregators for the write_all operation \n", fh->f_num_aggrs); aggr_data = (mca_io_ompio_aggregator_data **) malloc ( fh->f_num_aggrs * sizeof(mca_io_ompio_aggregator_data*)); @@ -1566,8 +1497,7 @@ void mca_fcoll_vulcan_calc_io_array(mca_common_ompio_io_array_t *io_array, int * } static int local_heap_sort (mca_io_ompio_local_io_array *io_array, - int num_entries, - int *sorted) + int num_entries, int *sorted) { int i = 0; int j = 0; @@ -1667,5 +1597,3 @@ static int local_heap_sort (mca_io_ompio_local_io_array *io_array, } return OMPI_SUCCESS; } - - diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan_internal.h b/ompi/mca/fcoll/vulcan/fcoll_vulcan_internal.h new file mode 100644 index 00000000000..76402297044 --- /dev/null +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan_internal.h @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef MCA_FCOLL_VULCAN_INTERNAL_H +#define MCA_FCOLL_VULCAN_INTERNAL_H + +#include "ompi_config.h" + + +BEGIN_C_DECLS +/* Used for loading file-offsets per aggregator*/ +typedef struct mca_io_ompio_local_io_array{ + OMPI_MPI_OFFSET_TYPE offset; + MPI_Aint length; + int process_id; +}mca_io_ompio_local_io_array; + +typedef struct mca_io_ompio_aggregator_data { + int *disp_index, *sorted, n; + size_t *fview_count; + int *max_disp_index; + int **blocklen_per_process; + MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written; + MPI_Comm comm; + char *global_buf, *prev_global_buf; + ompi_datatype_t **recvtype, **prev_recvtype; + struct iovec *global_iov_array; + int current_index, current_position; + int bytes_to_write_in_cycle, bytes_remaining, procs_per_group; + int *procs_in_group, iov_index; + size_t bytes_sent, prev_bytes_sent; + struct iovec *decoded_iov; + int bytes_to_write, prev_bytes_to_write; + mca_common_ompio_io_array_t *io_array, *prev_io_array; + int num_io_entries, prev_num_io_entries; +} mca_io_ompio_aggregator_data; + + +#define SWAP_REQUESTS(_r1,_r2) { \ + ompi_request_t **_t=_r1; \ + _r1=_r2; \ + _r2=_t;} + +#define SWAP_AGGR_POINTERS(_aggr,_num) { \ + int _i; \ + char *_t; \ + for (_i=0; _i<_num; _i++ ) { \ + _aggr[_i]->prev_io_array=_aggr[_i]->io_array; \ + _aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \ + _aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent; \ + _aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \ + _t=_aggr[_i]->prev_global_buf; \ + _aggr[_i]->prev_global_buf=_aggr[_i]->global_buf; \ + _aggr[_i]->global_buf=_t; \ + _t=(char *)_aggr[_i]->recvtype; \ + _aggr[_i]->recvtype=_aggr[_i]->prev_recvtype; \ + _aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t; } \ +} + +int mca_fcoll_vulcan_break_file_view (struct iovec *decoded_iov, int iov_count, + struct iovec *local_iov_array, int local_count, + struct iovec ***broken_decoded_iovs, int **broken_iov_counts, + struct iovec ***broken_iov_arrays, int **broken_counts, + MPI_Aint **broken_total_lengths, + int stripe_count, size_t stripe_size); + +int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, + size_t max_data); + +int mca_fcoll_vulcan_minmax (ompio_file_t *fh, struct iovec *iov, int iov_count, + int num_aggregators, long *new_stripe_size); + +void mca_fcoll_vulcan_calc_blocklen_disps (mca_io_ompio_aggregator_data *data, int aggregator, + int rank, size_t *bytes_comm); + +int mca_fcoll_vulcan_calc_file_offsets(mca_io_ompio_aggregator_data *data, + mca_io_ompio_local_io_array *file_offsets_for_agg, + int *sorted_file_offsets, MPI_Aint *memory_displacements, + int entries_per_aggregator, int rank, int index); + +void mca_fcoll_vulcan_calc_io_array(mca_common_ompio_io_array_t *io_array, int *num_io_entries, int max_io_arrays, + char *global_buf, mca_io_ompio_local_io_array *file_offsets_for_agg, + int *sorted_offsets, MPI_Aint *memory_displacements, int rank); + +END_C_DECLS + +#endif /* MCA_FCOLL_VULCAN_INTERNAL_H */