📄 ad_bgl_wrcoll.c
字号:
fd->fp_sys_posn = -1; /* set it to null. */}/* If successful, error_code is set to MPI_SUCCESS. Otherwise an error * code is created and returned in error_code. */static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype datatype, int nprocs, int myrank, ADIOI_Access *others_req, ADIO_Offset *offset_list, int *len_list, int contig_access_count, ADIO_Offset min_st_offset, ADIO_Offset fd_size, ADIO_Offset *fd_start, ADIO_Offset *fd_end, int *buf_idx, int *error_code){/* Send data to appropriate processes and write in sizes of no more than coll_bufsize. The idea is to reduce the amount of extra memory required for collective I/O. If all data were written all at once, which is much easier, it would require temp space more than the size of user_buf, which is often unacceptable. For example, to write a distributed array to a file, where each local array is 8Mbytes, requiring at least another 8Mbytes of temp space is unacceptable. */ int hole, i, j, m, size=0, ntimes, max_ntimes, buftype_is_contig; ADIO_Offset st_loc=-1, end_loc=-1, off, done, req_off; char *write_buf=NULL; int *curr_offlen_ptr, *count, *send_size, req_len, *recv_size; int *partial_recv, *sent_to_proc, *start_pos, flag; int *send_buf_idx, *curr_to_proc, *done_to_proc; MPI_Status status; ADIOI_Flatlist_node *flat_buf=NULL; MPI_Aint buftype_extent; int info_flag, coll_bufsize; char *value; static char myname[] = "ADIOI_EXCH_AND_WRITE"; *error_code = MPI_SUCCESS; /* changed below if error */ /* only I/O errors are currently reported *//* calculate the number of writes of size coll_bufsize to be done by each process and the max among all processes. That gives the no. of communication phases as well. */ value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL+1)*sizeof(char)); MPI_Info_get(fd->info, "cb_buffer_size", MPI_MAX_INFO_VAL, value, &info_flag); coll_bufsize = atoi(value); ADIOI_Free(value); for (i=0; i < nprocs; i++) { if (others_req[i].count) { st_loc = others_req[i].offsets[0]; end_loc = others_req[i].offsets[0]; break; } } for (i=0; i < nprocs; i++) for (j=0; j < others_req[i].count; j++) { st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]); end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j] + others_req[i].lens[j] - 1)); }/* ntimes=ceiling_div(end_loc - st_loc + 1, coll_bufsize)*/ ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize); if ((st_loc==-1) && (end_loc==-1)) { ntimes = 0; /* this process does no writing. */ } MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX, fd->comm); if (ntimes) write_buf = (char *) ADIOI_Malloc(coll_bufsize); curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int)); /* its use is explained below. calloc initializes to 0. */ count = (int *) ADIOI_Malloc(nprocs*sizeof(int)); /* to store count of how many off-len pairs per proc are satisfied in an iteration. */ partial_recv = (int *) ADIOI_Calloc(nprocs, sizeof(int)); /* if only a portion of the last off-len pair is recd. from a process in a particular iteration, the length recd. is stored here. calloc initializes to 0. */ send_size = (int *) ADIOI_Malloc(nprocs*sizeof(int)); /* total size of data to be sent to each proc. in an iteration. Of size nprocs so that I can use MPI_Alltoall later. */ recv_size = (int *) ADIOI_Malloc(nprocs*sizeof(int)); /* total size of data to be recd. from each proc. in an iteration.*/ sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int)); /* amount of data sent to each proc so far. Used in ADIOI_Fill_send_buffer. initialized to 0 here. */ send_buf_idx = (int *) ADIOI_Malloc(nprocs*sizeof(int)); curr_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int)); done_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int)); /* Above three are used in ADIOI_Fill_send_buffer*/ start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int)); /* used to store the starting value of curr_offlen_ptr[i] in this iteration */ ADIOI_Datatype_iscontig(datatype, &buftype_is_contig); if (!buftype_is_contig) { ADIOI_Flatten_datatype(datatype); flat_buf = ADIOI_Flatlist; while (flat_buf->type != datatype) flat_buf = flat_buf->next; } MPI_Type_extent(datatype, &buftype_extent);/* I need to check if there are any outstanding nonblocking writes to the file, which could potentially interfere with the writes taking place in this collective write call. Since this is not likely to be common, let me do the simplest thing possible here: Each process completes all pending nonblocking operations before completing. */ /*ADIOI_Complete_async(error_code); if (*error_code != MPI_SUCCESS) return; MPI_Barrier(fd->comm); */ done = 0; off = st_loc;#ifdef PROFILE MPE_Log_event(14, 0, "end computation");#endif for (m=0; m < ntimes; m++) { /* go through all others_req and check which will be satisfied by the current write */ /* Note that MPI guarantees that displacements in filetypes are in monotonically nondecreasing order and that, for writes, the filetypes cannot specify overlapping regions in the file. This simplifies implementation a bit compared to reads. */ /* off = start offset in the file for the data to be written in this iteration size = size of data written (bytes) corresponding to off req_off = off in file for a particular contiguous request minus what was satisfied in previous iteration req_size = size corresponding to req_off */ /* first calculate what should be communicated */#ifdef PROFILE MPE_Log_event(13, 0, "start computation");#endif for (i=0; i < nprocs; i++) count[i] = recv_size[i] = 0; size = (int) (ADIOI_MIN(coll_bufsize, end_loc-st_loc+1-done)); for (i=0; i < nprocs; i++) { if (others_req[i].count) { start_pos[i] = curr_offlen_ptr[i]; for (j=curr_offlen_ptr[i]; j<others_req[i].count; j++) { if (partial_recv[i]) { /* this request may have been partially satisfied in the previous iteration. */ req_off = others_req[i].offsets[j] + partial_recv[i]; req_len = others_req[i].lens[j] - partial_recv[i]; partial_recv[i] = 0; /* modify the off-len pair to reflect this change */ others_req[i].offsets[j] = req_off; others_req[i].lens[j] = req_len; } else { req_off = others_req[i].offsets[j]; req_len = others_req[i].lens[j]; } if (req_off < off + size) { count[i]++; MPI_Address(write_buf+req_off-off, &(others_req[i].mem_ptrs[j])); recv_size[i] += (int)(ADIOI_MIN(off + (ADIO_Offset)size - req_off, req_len)); if (off+size-req_off < req_len) { partial_recv[i] = (int) (off + size - req_off); /* --BEGIN ERROR HANDLING-- */ if ((j+1 < others_req[i].count) && (others_req[i].offsets[j+1] < off+size)) { *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, myname, __LINE__, MPI_ERR_ARG, "Filetype specifies overlapping write regions (which is illegal according to the MPI-2 specification)", 0); /* allow to continue since additional * communication might have to occur */ } /* --END ERROR HANDLING-- */ break; } } else break; } curr_offlen_ptr[i] = j; } } #ifdef PROFILE MPE_Log_event(14, 0, "end computation"); MPE_Log_event(7, 0, "start communication");#endif if (bglmpio_comm == 1) ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list, len_list, send_size, recv_size, off, size, count, start_pos, partial_recv, sent_to_proc, nprocs, myrank, buftype_is_contig, contig_access_count, min_st_offset, fd_size, fd_start, fd_end, others_req, send_buf_idx, curr_to_proc, done_to_proc, &hole, m, buftype_extent, buf_idx, error_code); else if (bglmpio_comm == 0) ADIOI_W_Exchange_data_alltoallv(fd, buf, write_buf, flat_buf, offset_list, len_list, send_size, recv_size, off, size, count, start_pos, partial_recv, sent_to_proc, nprocs, myrank, buftype_is_contig, contig_access_count, min_st_offset, fd_size, fd_start, fd_end, others_req, send_buf_idx, curr_to_proc, done_to_proc, &hole, m, buftype_extent, buf_idx, error_code); if (*error_code != MPI_SUCCESS) return;#ifdef PROFILE MPE_Log_event(8, 0, "end communication");#endif flag = 0; for (i=0; i<nprocs; i++) if (count[i]) flag = 1; if (flag) { ADIO_WriteContig(fd, write_buf, size, MPI_BYTE, ADIO_EXPLICIT_OFFSET, off, &status, error_code); if (*error_code != MPI_SUCCESS) return; } off += size; done += size; } for (i=0; i<nprocs; i++) count[i] = recv_size[i] = 0;#ifdef PROFILE MPE_Log_event(7, 0, "start communication");#endif for (m=ntimes; m<max_ntimes; m++) /* nothing to recv, but check for send. */ if (bglmpio_comm == 1) ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list, len_list, send_size, recv_size, off, size, count, start_pos, partial_recv, sent_to_proc, nprocs, myrank, buftype_is_contig, contig_access_count, min_st_offset, fd_size, fd_start, fd_end, others_req, send_buf_idx, curr_to_proc, done_to_proc, &hole, m, buftype_extent, buf_idx, error_code); else if (bglmpio_comm == 0) ADIOI_W_Exchange_data_alltoallv(fd, buf, write_buf, flat_buf, offset_list, len_list, send_size, recv_size, off, size, count, start_pos, partial_recv, sent_to_proc, nprocs, myrank, buftype_is_contig, contig_access_count, min_st_offset, fd_size, fd_start, fd_end, others_req, send_buf_idx, curr_to_proc, done_to_proc, &hole, m, buftype_extent, buf_idx, error_code); if (*error_code != MPI_SUCCESS) return;#ifdef PROFILE MPE_Log_event(8, 0, "end communication");#endif if (ntimes) ADIOI_Free(write_buf); ADIOI_Free(curr_offlen_ptr); ADIOI_Free(count); ADIOI_Free(partial_recv); ADIOI_Free(send_size); ADIOI_Free(recv_size); ADIOI_Free(sent_to_proc); ADIOI_Free(start_pos); ADIOI_Free(send_buf_idx); ADIOI_Free(curr_to_proc); ADIOI_Free(done_to_proc);}/* Sets error_code to MPI_SUCCESS if successful, or creates an error code * in the case of error. */static void ADIOI_W_Exchange_data(ADIO_File fd, void *buf, char *write_buf, ADIOI_Flatlist_node *flat_buf, ADIO_Offset *offset_list, int *len_list, int *send_size, int *recv_size, ADIO_Offset off, int size, int *count, int *start_pos, int *partial_recv, int *sent_to_proc, int nprocs, int myrank, int buftype_is_contig, int contig_access_count, ADIO_Offset min_st_offset, ADIO_Offset fd_size, ADIO_Offset *fd_start, ADIO_Offset *fd_end, ADIOI_Access *others_req, int *send_buf_idx, int *curr_to_proc, int *done_to_proc, int *hole, int iter, MPI_Aint buftype_extent, int *buf_idx, int *error_code){ int i, j, k, *tmp_len, nprocs_recv, nprocs_send, err; char **send_buf = NULL; MPI_Request *requests, *send_req; MPI_Datatype *recv_types; MPI_Status *statuses, status; int *srt_len, sum; ADIO_Offset *srt_off; static char myname[] = "ADIOI_W_EXCHANGE_DATA";/* exchange recv_size info so that each process knows how much to send to whom. */ MPI_Alltoall(recv_size, 1, MPI_INT, send_size, 1, MPI_INT, fd->comm); /* create derived datatypes for recv */ nprocs_recv = 0; for (i=0; i<nprocs; i++) if (recv_size[i]) nprocs_recv++; recv_types = (MPI_Datatype *) ADIOI_Malloc((nprocs_recv+1)*sizeof(MPI_Datatype)); /* +1 to avoid a 0-size malloc */ tmp_len = (int *) ADIOI_Malloc(nprocs*sizeof(int)); j = 0; for (i=0; i<nprocs; i++) { if (recv_size[i]) {/* take care if the last off-len pair is a partial recv */ if (partial_recv[i]) { k = start_pos[i] + count[i] - 1;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -