📄 ad_bgl_rdcoll.c
字号:
offset_list[0] = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind : fd->disp + etype_size * offset; len_list[0] = bufcount * buftype_size; *start_offset_ptr = offset_list[0]; *end_offset_ptr = offset_list[0] + len_list[0] - 1; /* update file pointer */ if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = *end_offset_ptr + 1; } else { /* First calculate what size of offset_list and len_list to allocate */ /* filetype already flattened in ADIO_Open or ADIO_Fcntl */ flat_file = ADIOI_Flatlist; while (flat_file->type != fd->filetype) flat_file = flat_file->next; disp = fd->disp; if (file_ptr_type == ADIO_INDIVIDUAL) { offset = fd->fp_ind; /* in bytes */ n_filetypes = -1; flag = 0; while (!flag) { n_filetypes++; for (i=0; i<flat_file->count; i++) { if (disp + flat_file->indices[i] + (ADIO_Offset) n_filetypes*filetype_extent + flat_file->blocklens[i] >= offset) { st_index = i; frd_size = (int) (disp + flat_file->indices[i] + (ADIO_Offset) n_filetypes*filetype_extent + flat_file->blocklens[i] - offset); flag = 1; break; } } } } else { n_etypes_in_filetype = filetype_size/etype_size; n_filetypes = (int) (offset / n_etypes_in_filetype); etype_in_filetype = (int) (offset % n_etypes_in_filetype); size_in_filetype = etype_in_filetype * etype_size; sum = 0; for (i=0; i<flat_file->count; i++) { sum += flat_file->blocklens[i]; if (sum > size_in_filetype) { st_index = i; frd_size = sum - size_in_filetype; abs_off_in_filetype = flat_file->indices[i] + size_in_filetype - (sum - flat_file->blocklens[i]); break; } } /* abs. offset in bytes in the file */ offset = disp + (ADIO_Offset) n_filetypes*filetype_extent + abs_off_in_filetype; } /* calculate how much space to allocate for offset_list, len_list */ old_frd_size = frd_size; contig_access_count = i = 0; j = st_index; bufsize = buftype_size * bufcount; frd_size = ADIOI_MIN(frd_size, bufsize); while (i < bufsize) { if (frd_size) contig_access_count++; i += frd_size; j = (j + 1) % flat_file->count; frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i); } /* allocate space for offset_list and len_list */ *offset_list_ptr = (ADIO_Offset *) ADIOI_Malloc((contig_access_count+1)*sizeof(ADIO_Offset)); *len_list_ptr = (int *) ADIOI_Malloc((contig_access_count+1)*sizeof(int)); /* +1 to avoid a 0-size malloc */ offset_list = *offset_list_ptr; len_list = *len_list_ptr; /* find start offset, end offset, and fill in offset_list and len_list */ *start_offset_ptr = offset; /* calculated above */ i = k = 0; j = st_index; off = offset; frd_size = ADIOI_MIN(old_frd_size, bufsize); while (i < bufsize) { if (frd_size) { offset_list[k] = off; len_list[k] = frd_size; k++; } i += frd_size; end_offset = off + frd_size - 1; /* Note: end_offset points to the last byte-offset that will be accessed. e.g., if start_offset=0 and 100 bytes to be read, end_offset=99*/ if (off + frd_size < disp + flat_file->indices[j] + flat_file->blocklens[j] + (ADIO_Offset) n_filetypes*filetype_extent) { off += frd_size; /* did not reach end of contiguous block in filetype. * no more I/O needed. off is incremented by frd_size. */ } else { if (j < (flat_file->count - 1)) j++; else { /* hit end of flattened filetype; * start at beginning again */ j = 0; n_filetypes++; } off = disp + flat_file->indices[j] + (ADIO_Offset) n_filetypes*filetype_extent; frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i); } } /* update file pointer */ if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = off; *contig_access_count_ptr = contig_access_count; *end_offset_ptr = end_offset; }}#endifstatic void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype datatype, int nprocs, int myrank, ADIOI_Access *others_req, ADIO_Offset *offset_list, int *len_list, int contig_access_count, ADIO_Offset min_st_offset, ADIO_Offset fd_size, ADIO_Offset *fd_start, ADIO_Offset *fd_end, int *buf_idx, int *error_code){/* Read in sizes of no more than coll_bufsize, an info parameter. Send data to appropriate processes. Place recd. data in user buf. The idea is to reduce the amount of extra memory required for collective I/O. If all data were read all at once, which is much easier, it would require temp space more than the size of user_buf, which is often unacceptable. For example, to read a distributed array from a file, where each local array is 8Mbytes, requiring at least another 8Mbytes of temp space is unacceptable. */ int i, j, m, size, ntimes, max_ntimes, buftype_is_contig; ADIO_Offset st_loc=-1, end_loc=-1, off, done, real_off, req_off; char *read_buf = NULL, *tmp_buf; int *curr_offlen_ptr, *count, *send_size, *recv_size; int *partial_send, *recd_from_proc, *start_pos, for_next_iter; int real_size, req_len, flag, for_curr_iter, rank; MPI_Status status; ADIOI_Flatlist_node *flat_buf=NULL; MPI_Aint buftype_extent; int coll_bufsize; int iii; *error_code = MPI_SUCCESS; /* changed below if error */ /* only I/O errors are currently reported */ /* calculate the number of reads of size coll_bufsize to be done by each process and the max among all processes. That gives the no. of communication phases as well. coll_bufsize is obtained from the hints object. */ coll_bufsize = fd->hints->cb_buffer_size; /* grab some initial values for st_loc and end_loc */ for (i=0; i < nprocs; i++) { if (others_req[i].count) { st_loc = others_req[i].offsets[0]; end_loc = others_req[i].offsets[0]; break; } } /* now find the real values */ for (i=0; i < nprocs; i++) for (j=0; j<others_req[i].count; j++) { st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]); end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j] + others_req[i].lens[j] - 1)); } /* calculate ntimes, the number of times this process must perform I/O * operations in order to complete all the requests it has received. * the need for multiple I/O operations comes from the restriction that * we only use coll_bufsize bytes of memory for internal buffering. */ if ((st_loc==-1) && (end_loc==-1)) { /* this process does no I/O. */ ntimes = 0; } else { /* ntimes=ceiling_div(end_loc - st_loc + 1, coll_bufsize)*/ ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize); } MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX, fd->comm); if (ntimes) read_buf = (char *) ADIOI_Malloc(coll_bufsize); curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int)); /* its use is explained below. calloc initializes to 0. */ count = (int *) ADIOI_Malloc(nprocs * sizeof(int)); /* to store count of how many off-len pairs per proc are satisfied in an iteration. */ partial_send = (int *) ADIOI_Calloc(nprocs, sizeof(int)); /* if only a portion of the last off-len pair is sent to a process in a particular iteration, the length sent is stored here. calloc initializes to 0. */ send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int)); /* total size of data to be sent to each proc. in an iteration */ recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int)); /* total size of data to be recd. from each proc. in an iteration. Of size nprocs so that I can use MPI_Alltoall later. */ recd_from_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int)); /* amount of data recd. so far from each proc. Used in ADIOI_Fill_user_buffer. initialized to 0 here. */ start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int)); /* used to store the starting value of curr_offlen_ptr[i] in this iteration */ ADIOI_Datatype_iscontig(datatype, &buftype_is_contig); if (!buftype_is_contig) { ADIOI_Flatten_datatype(datatype); flat_buf = ADIOI_Flatlist; while (flat_buf->type != datatype) flat_buf = flat_buf->next; } MPI_Type_extent(datatype, &buftype_extent); done = 0; off = st_loc; for_curr_iter = for_next_iter = 0; MPI_Comm_rank(fd->comm, &rank);#ifdef PROFILE MPE_Log_event(14, 0, "end computation");#endif for (m=0; m<ntimes; m++) { /* read buf of size coll_bufsize (or less) */ /* go through all others_req and check if any are satisfied by the current read */ /* since MPI guarantees that displacements in filetypes are in monotonically nondecreasing order, I can maintain a pointer (curr_offlen_ptr) to current off-len pair for each process in others_req and scan further only from there. There is still a problem of filetypes such as: (1, 2, 3 are not process nos. They are just numbers for three chunks of data, specified by a filetype.) 1 -------!-- 2 -----!---- 3 --!----- where ! indicates where the current read_size limitation cuts through the filetype. I resolve this by reading up to !, but filling the communication buffer only for 1. I copy the portion left over for 2 into a tmp_buf for use in the next iteration. i.e., 2 and 3 will be satisfied in the next iteration. This simplifies filling in the user's buf at the other end, as only one off-len pair with incomplete data will be sent. I also don't need to send the individual offsets and lens along with the data, as the data is being sent in a particular order. */ /* off = start offset in the file for the data actually read in this iteration size = size of data read corresponding to off real_off = off minus whatever data was retained in memory from previous iteration for cases like 2, 3 illustrated above real_size = size plus the extra corresponding to real_off req_off = off in file for a particular contiguous request minus what was satisfied in previous iteration req_size = size corresponding to req_off */#ifdef PROFILE MPE_Log_event(13, 0, "start computation");#endif size = (int) (ADIOI_MIN(coll_bufsize, end_loc-st_loc+1-done)); real_off = off - for_curr_iter; real_size = size + for_curr_iter; for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0; for_next_iter = 0; for (i=0; i<nprocs; i++) { /* FPRINTF(stderr, "rank %d, i %d, others_count %d\n", rank, i, others_req[i].count); */ if (others_req[i].count) { start_pos[i] = curr_offlen_ptr[i]; for (j=curr_offlen_ptr[i]; j<others_req[i].count; j++) { if (partial_send[i]) { /* this request may have been partially satisfied in the previous iteration. */ req_off = others_req[i].offsets[j] + partial_send[i]; req_len = others_req[i].lens[j] - partial_send[i]; partial_send[i] = 0; /* modify the off-len pair to reflect this change */ others_req[i].offsets[j] = req_off; others_req[i].lens[j] = req_len; } else { req_off = others_req[i].offsets[j]; req_len = others_req[i].lens[j]; } if (req_off < real_off + real_size) { count[i]++; MPI_Address(read_buf+req_off-real_off, &(others_req[i].mem_ptrs[j])); send_size[i] += (int)(ADIOI_MIN(real_off + (ADIO_Offset)real_size - req_off, req_len)); if (real_off+real_size-req_off < req_len) { partial_send[i] = (int) (real_off+real_size- req_off); if ((j+1 < others_req[i].count) && (others_req[i].offsets[j+1] < real_off+real_size)) { /* this is the case illustrated in the figure above. */ for_next_iter = (int) (ADIOI_MAX(for_next_iter, real_off + real_size - others_req[i].offsets[j+1])); /* max because it must cover requests from different processes */ } break; } } else break; } curr_offlen_ptr[i] = j; } } flag = 0; for (i=0; i<nprocs; i++) if (count[i]) flag = 1;#ifdef PROFILE MPE_Log_event(14, 0, "end computation");#endif if (flag) { ADIO_ReadContig(fd, read_buf+for_curr_iter, size, MPI_BYTE, ADIO_EXPLICIT_OFFSET, off, &status, error_code);/* printf( "\tread_coll: 700, data read [%3d] = ", size ); for (iii=0; iii<size; iii++) { printf( "%3d,", *((unsigned char *)read_buf + for_curr_iter + iii) ); } printf( "\n" ); */ if (*error_code != MPI_SUCCESS) return; } for_curr_iter = for_next_iter; #ifdef PROFILE MPE_Log_event(7, 0, "start communication");#endif if (bglmpio_comm == 1) ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list, send_size, recv_size, count, start_pos, partial_send, recd_from_proc, nprocs, myrank, buftype_is_contig, contig_access_count, min_st_offset, fd_size, fd_start, fd_end, others_req, m, buftype_extent, buf_idx); else if (bglmpio_comm == 0) { ADIOI_R_Exchange_data_alltoallv(fd, buf, flat_buf, offset_list, len_list, send_size, recv_size, count, start_pos, partial_send, recd_from_proc, nprocs, myrank, buftype_is_contig, contig_access_count, min_st_offset, fd_size, fd_start, fd_end, others_req, m, buftype_extent, buf_idx); }#ifdef PROFILE MPE_Log_event(8, 0, "end communication");#endif if (for_next_iter) { tmp_buf = (char *) ADIOI_Malloc(for_next_iter); memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter); ADIOI_Free(read_buf); read_buf = (char *) ADIOI_Malloc(for_next_iter+coll_bufsize); memcpy(read_buf, tmp_buf, for_next_iter); ADIOI_Free(tmp_buf); } off += size; done += size; } for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;#ifdef PROFILE MPE_Log_event(7, 0, "start communication");#endif for (m=ntimes; m<max_ntimes; m++) /* nothing to send, but check for recv. */ if (bglmpio_comm == 1) ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list, send_size, recv_size, count, start_pos, partial_send, recd_from_proc, nprocs, myrank,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -