📄 io_romio_ad_write_coll.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * * Copyright (C) 1997 University of Chicago. * See COPYRIGHT notice in top-level directory. */#include "adio.h"#include "adio_extern.h"#ifdef PROFILE#include "mpe.h"#endif#include "limits.h"/* prototypes of functions used for collective writes only. */static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype datatype, int nprocs, int myrank, int interleave_count, ADIOI_Access *others_req, ADIO_Offset *offset_list, int *len_list, int contig_access_count, ADIO_Offset min_st_offset, ADIO_Offset fd_size, ADIO_Offset *fd_start, ADIO_Offset *fd_end, int *buf_idx, int *error_code);static void ADIOI_W_Exchange_data(ADIO_File fd, void *buf, char *write_buf, ADIOI_Flatlist_node *flat_buf, ADIO_Offset *offset_list, int *len_list, int *send_size, int *recv_size, ADIO_Offset off, int size, int *count, int *start_pos, int *partial_recv, int *sent_to_proc, int nprocs, int myrank, int buftype_is_contig, int contig_access_count, ADIO_Offset min_st_offset, ADIO_Offset fd_size, ADIO_Offset *fd_start, ADIO_Offset *fd_end, ADIOI_Access *others_req, int *send_buf_idx, int *curr_to_proc, int *done_to_proc, int *hole, int iter, MPI_Aint buftype_extent, int *buf_idx, int *error_code);static void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node *flat_buf, char **send_buf, ADIO_Offset *offset_list, int *len_list, int *send_size, MPI_Request *requests, int *sent_to_proc, int nprocs, int myrank, int contig_access_count, ADIO_Offset min_st_offset, ADIO_Offset fd_size, ADIO_Offset *fd_start, ADIO_Offset *fd_end, int *send_buf_idx, int *curr_to_proc, int *done_to_proc, int iter, MPI_Aint buftype_extent);static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, ADIO_Offset *srt_off, int *srt_len, int *start_pos, int nprocs, int nprocs_recv, int total_elements);void ADIOI_GEN_WriteStridedColl(ADIO_File fd, void *buf, int count, MPI_Datatype datatype, int file_ptr_type, ADIO_Offset offset, ADIO_Status *status, int *error_code){/* Uses a generalized version of the extended two-phase method described in "An Extended Two-Phase Method for Accessing Sections of Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary, Scientific Programming, (5)4:301--317, Winter 1996. http://www.mcs.anl.gov/home/thakur/ext2ph.ps */ ADIOI_Access *my_req; /* array of nprocs access structures, one for each other process in whose file domain this process's request lies */ ADIOI_Access *others_req; /* array of nprocs access structures, one for each other process whose request lies in this process's file domain. */ int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank; int contig_access_count=0, interleave_count = 0, buftype_is_contig; int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs; ADIO_Offset orig_fp, start_offset, end_offset, fd_size, min_st_offset, off; ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL, *fd_end = NULL, *end_offsets = NULL; int *buf_idx = NULL, *len_list = NULL; char value[MPI_MAX_INFO_VAL]; int info_flag, ompi_parallel_opts = 0; unsigned long long min_pe_request = ULONG_MAX; unsigned long long max_pe_request = 0; unsigned long long min_rd_request = ULONG_MAX; unsigned long long max_rd_request = 0; int old_error; MPI_Info_get(fd->info, "ompi_enable_parallel_optimizations", MPI_MAX_INFO_VAL, value, &info_flag); if (info_flag) ompi_parallel_opts = 1;#ifdef PROFILE MPE_Log_event(13, 0, "start computation");#endif MPI_Comm_size(fd->comm, &nprocs); MPI_Comm_rank(fd->comm, &myrank);/* the number of processes that actually perform I/O, nprocs_for_coll, * is stored in the hints off the ADIO_File structure */ nprocs_for_coll = fd->hints->cb_nodes; orig_fp = fd->fp_ind; /* only check for interleaving if cb_write isn't disabled */ if (fd->hints->cb_write != ADIOI_HINT_DISABLE) { /* For this process's request, calculate the list of offsets and lengths in the file and determine the start and end offsets. */ /* Note: end_offset points to the last byte-offset that will be accessed. e.g., if start_offset=0 and 100 bytes to be read, end_offset=99*/ ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset, &offset_list, &len_list, &start_offset, &end_offset, &contig_access_count); /* each process communicates its start and end offsets to other processes. The result is an array each of start and end offsets stored in order of process rank. */ st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset)); end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset)); if (ompi_parallel_opts) { /* OMPI: reduce the collectives calls from 2 to 1, to improve scaling */ ADIO_Offset *stend_offsets, min_rd_st_offset, max_rd_end_offset, total_rd_size; ADIO_Offset my_offsets[2]; int nprocs_for_creq; stend_offsets = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset)); my_offsets[0] = start_offset; my_offsets[1] = end_offset; MPI_Allgather(my_offsets, 2, ADIO_OFFSET, stend_offsets, 2, ADIO_OFFSET, fd->comm); min_rd_st_offset = stend_offsets[0]; max_rd_end_offset = stend_offsets[1]; for (i=0; i<nprocs; i++) { st_offsets [i] = stend_offsets[i*2 ]; end_offsets[i] = stend_offsets[i*2+1]; min_rd_st_offset = ADIOI_MIN(st_offsets [i],min_rd_st_offset); max_rd_end_offset = ADIOI_MAX(end_offsets[i],max_rd_end_offset); min_pe_request = ADIOI_MIN((ADIO_Offset) min_pe_request,end_offsets[i]-st_offsets[i]+1); max_pe_request = ADIOI_MAX((ADIO_Offset) max_pe_request,end_offsets[i]-st_offsets[i]+1); } min_rd_request = ADIOI_MIN((ADIO_Offset) min_rd_request, max_rd_end_offset-min_rd_st_offset+1); max_rd_request = ADIOI_MAX((ADIO_Offset) max_rd_request, max_rd_end_offset-min_rd_st_offset+1); ADIOI_Free(stend_offsets); MPI_Info_get(fd->info, "ompi_cb_nodes_runtime_override", MPI_MAX_INFO_VAL, value, &info_flag); if (info_flag) { /* ------------------------------------------------------------------ */ /* OMPI: swh@lanl.gov (Steve Hodson): */ /* If user has not specified cb_nodes then calculate it as follows: */ /* 1)nprocs_for_coll depends initially on the collective request size.*/ /* For larger requests the denominator is directly proportional to */ /* the number of times the collective buffer is reused per request. */ /* 2)nprocs_for_coll limited to 1/4 the number of processes */ /* 3)nprocs_for_coll is at least to 1/32 the number of processes */ /* 4)nprocs_for_coll limited to range 1-32. Need at least 1, */ /* but don't exceed expected number of disks in use at a time */ /* 5)nprocs_for_coll even workaround */ /* 6)nprocs_for_coll at least 2 for more than 15 processes, */ /* regardless of how small collective request is. */ /* Caveat: */ /* The preceeding recipe was arrived at empirically for the */ /* Panasas file system on Flash. Applicability to other file systems */ /* needs to be demonstrated. */ /* Caution: Care must be taken below to make sure that nprocs_for_coll*/ /* NEVER exceeds the default aggregator configuration list build once */ /* in open: ADIOI_cb_config_list_parse. Since nprocs_for_coll is */ /* usually less that this number, only a subset of the previously */ /* allocated aggregators will be used. */ /* ------------------------------------------------------------------ */ total_rd_size = max_rd_end_offset - min_rd_st_offset + 1; nprocs_for_creq = (int)(total_rd_size / ( 8 * 1024 * 1024 )); nprocs_for_coll = ADIOI_MIN(nprocs_for_creq, nprocs/ 4); nprocs_for_coll = ADIOI_MAX(nprocs_for_coll, nprocs/32); nprocs_for_coll = ADIOI_MAX(nprocs_for_coll, 1); nprocs_for_coll = ADIOI_MIN(nprocs_for_coll, 32); if (nprocs_for_coll > 1 && nprocs_for_coll%2 ) nprocs_for_coll--; if ( nprocs > 15 ) nprocs_for_coll = ADIOI_MAX(nprocs_for_coll, 2 ); } } else { MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1, ADIO_OFFSET, fd->comm); MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1, ADIO_OFFSET, fd->comm); } /* are the accesses of different processes interleaved? */ for (i=1; i<nprocs; i++) if ((st_offsets[i] < end_offsets[i-1]) && (st_offsets[i] <= end_offsets[i])) interleave_count++; /* This is a rudimentary check for interleaving, but should suffice for the moment. */ } ADIOI_Datatype_iscontig(datatype, &buftype_is_contig); if (fd->hints->cb_write == ADIOI_HINT_DISABLE || (!interleave_count && (fd->hints->cb_write == ADIOI_HINT_AUTO))) { /* use independent accesses */ if (fd->hints->cb_write != ADIOI_HINT_DISABLE) { ADIOI_Free(offset_list); ADIOI_Free(len_list); ADIOI_Free(st_offsets); ADIOI_Free(end_offsets); } fd->fp_ind = orig_fp; ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig); if (buftype_is_contig && filetype_is_contig) { if (file_ptr_type == ADIO_EXPLICIT_OFFSET) { off = fd->disp + (fd->etype_size) * offset; ADIO_WriteContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET, off, status, error_code); } else ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL, 0, status, error_code); } else ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type, offset, status, error_code); return; }/* Divide the I/O workload among "nprocs_for_coll" processes. This is done by (logically) dividing the file into file domains (FDs); each process may directly access only its own file domain. */ ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs, nprocs_for_coll, &min_st_offset, &fd_start, &fd_end, &fd_size); /* calculate what portions of the access requests of this process are located in what file domains */ ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count, min_st_offset, fd_start, fd_end, fd_size, nprocs, &count_my_req_procs, &count_my_req_per_proc, &my_req, &buf_idx); /* based on everyone's my_req, calculate what requests of other processes lie in this process's file domain. count_others_req_procs = number of processes whose requests lie in this process's file domain (including this process itself) count_others_req_per_proc[i] indicates how many separate contiguous requests of proc. i lie in this process's file domain. */ ADIOI_Calc_others_req(fd, count_my_req_procs, count_my_req_per_proc, my_req, nprocs, myrank, &count_others_req_procs, &others_req); ADIOI_Free(count_my_req_per_proc); for (i=0; i < nprocs; i++) { if (my_req[i].count) { ADIOI_Free(my_req[i].offsets); ADIOI_Free(my_req[i].lens); } } ADIOI_Free(my_req);/* exchange data and write in sizes of no more than coll_bufsize. */ ADIOI_Exch_and_write(fd, buf, datatype, nprocs, myrank, interleave_count, others_req, offset_list, len_list, contig_access_count, min_st_offset, fd_size, fd_start, fd_end, buf_idx, error_code); /* If this collective write is followed by an independent write, * it's possible to have those subsequent writes on other processes * race ahead and sneak in before the read-modify-write completes. * We carry out a collective communication at the end here so no one * can start independent i/o before collective I/O completes. * * optimization: if only one process performing i/o, we can perform * a less-expensive Bcast * * need to do some gymnastics with the error codes so that if something * went wrong, all processes report error, but if a process has a more * specific error code, we can still have that process report the * additional information */ old_error = *error_code; if (*error_code != MPI_SUCCESS) *error_code = MPI_ERR_IO; if (fd->hints->cb_nodes == 1) MPI_Bcast(error_code, 1, MPI_INT, fd->hints->ranklist[0], fd->comm); else MPI_Allreduce(MPI_IN_PLACE, error_code, 1, MPI_INT, MPI_MAX, fd->comm); if ( (old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO) ) *error_code = old_error; if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);/* free all memory allocated for collective I/O */ for (i=0; i<nprocs; i++) { if (others_req[i].count) { ADIOI_Free(others_req[i].offsets); ADIOI_Free(others_req[i].lens); ADIOI_Free(others_req[i].mem_ptrs); } } ADIOI_Free(others_req); ADIOI_Free(buf_idx); ADIOI_Free(offset_list); ADIOI_Free(len_list); ADIOI_Free(st_offsets); ADIOI_Free(end_offsets); ADIOI_Free(fd_start); ADIOI_Free(fd_end);#ifdef HAVE_STATUS_SET_BYTES if (status) { int bufsize, size; /* Don't set status if it isn't needed */ MPI_Type_size(datatype, &size); bufsize = size * count; MPIR_Status_set_bytes(status, datatype, bufsize); }/* This is a temporary way of filling in status. The right way is to keep track of how much data was actually written during collective I/O. */#endif fd->fp_sys_posn = -1; /* set it to null. */}/* If successful, error_code is set to MPI_SUCCESS. Otherwise an error * code is created and returned in error_code. */static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype datatype, int nprocs, int myrank, int interleave_count, ADIOI_Access *others_req, ADIO_Offset *offset_list, int *len_list, int contig_access_count, ADIO_Offset min_st_offset, ADIO_Offset fd_size, ADIO_Offset *fd_start, ADIO_Offset *fd_end, int *buf_idx, int *error_code){/* Send data to appropriate processes and write in sizes of no more than coll_bufsize. The idea is to reduce the amount of extra memory required for collective I/O. If all data were written all at once, which is much easier, it would require temp space more than the size of user_buf, which is often unacceptable. For example, to write a distributed array to a file, where each local array is 8Mbytes, requiring at least another 8Mbytes of temp space is unacceptable. */ /* TODO: 'hole' not used outside of ADIOI_W_Exchange_data */ int hole, i, j, m, size=0, ntimes, max_ntimes, buftype_is_contig; ADIO_Offset st_loc=-1, end_loc=-1, off, done, req_off; char *write_buf=NULL; int *curr_offlen_ptr, *count, *send_size, req_len, *recv_size; int *partial_recv, *sent_to_proc, *start_pos, flag; int *send_buf_idx, *curr_to_proc, *done_to_proc; MPI_Status status; ADIOI_Flatlist_node *flat_buf=NULL;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -