📄 ad_read_coll.c
字号:
if (flag) { ADIO_ReadContig(fd, read_buf+for_curr_iter, size, MPI_BYTE, ADIO_EXPLICIT_OFFSET, off, &status, error_code); if (*error_code != MPI_SUCCESS) return; } for_curr_iter = for_next_iter; ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list, send_size, recv_size, count, start_pos, partial_send, recd_from_proc, nprocs, myrank, buftype_is_contig, contig_access_count, min_st_offset, fd_size, fd_start, fd_end, others_req, m, buftype_extent, buf_idx); if (for_next_iter) { tmp_buf = (char *) ADIOI_Malloc(for_next_iter); memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter); ADIOI_Free(read_buf); read_buf = (char *) ADIOI_Malloc(for_next_iter+coll_bufsize); memcpy(read_buf, tmp_buf, for_next_iter); ADIOI_Free(tmp_buf); } off += size; done += size; } for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0; for (m=ntimes; m<max_ntimes; m++) /* nothing to send, but check for recv. */ ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list, send_size, recv_size, count, start_pos, partial_send, recd_from_proc, nprocs, myrank, buftype_is_contig, contig_access_count, min_st_offset, fd_size, fd_start, fd_end, others_req, m, buftype_extent, buf_idx); if (ntimes) ADIOI_Free(read_buf); ADIOI_Free(curr_offlen_ptr); ADIOI_Free(count); ADIOI_Free(partial_send); ADIOI_Free(send_size); ADIOI_Free(recv_size); ADIOI_Free(recd_from_proc); ADIOI_Free(start_pos);}static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node *flat_buf, ADIO_Offset *offset_list, int *len_list, int *send_size, int *recv_size, int *count, int *start_pos, int *partial_send, int *recd_from_proc, int nprocs, int myrank, int buftype_is_contig, int contig_access_count, ADIO_Offset min_st_offset, ADIO_Offset fd_size, ADIO_Offset *fd_start, ADIO_Offset *fd_end, ADIOI_Access *others_req, int iter, MPI_Aint buftype_extent, int *buf_idx){ int i, j, k=0, tmp=0, nprocs_recv, nprocs_send; char **recv_buf = NULL; MPI_Request *requests; MPI_Datatype send_type; MPI_Status *statuses;/* exchange send_size info so that each process knows how much to receive from whom and how much memory to allocate. */ MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm); nprocs_recv = 0; for (i=0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++; nprocs_send = 0; for (i=0; i<nprocs; i++) if (send_size[i]) nprocs_send++; requests = (MPI_Request *) ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));/* +1 to avoid a 0-size malloc *//* post recvs. if buftype_is_contig, data can be directly recd. into user buf at location given by buf_idx. else use recv_buf. */ if (buftype_is_contig) { j = 0; for (i=0; i < nprocs; i++) if (recv_size[i]) { MPI_Irecv(((char *) buf) + buf_idx[i], recv_size[i], MPI_BYTE, i, myrank+i+100*iter, fd->comm, requests+j); j++; buf_idx[i] += recv_size[i]; } } else {/* allocate memory for recv_buf and post receives */ recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char*)); for (i=0; i < nprocs; i++) if (recv_size[i]) recv_buf[i] = (char *) ADIOI_Malloc(recv_size[i]); j = 0; for (i=0; i < nprocs; i++) if (recv_size[i]) { MPI_Irecv(recv_buf[i], recv_size[i], MPI_BYTE, i, myrank+i+100*iter, fd->comm, requests+j); j++; /* FPRINTF(stderr, "node %d, recv_size %d, tag %d \n", myrank, recv_size[i], myrank+i+100*iter); */ } }/* create derived datatypes and send data */ j = 0; for (i=0; i<nprocs; i++) { if (send_size[i]) {/* take care if the last off-len pair is a partial send */ if (partial_send[i]) { k = start_pos[i] + count[i] - 1; tmp = others_req[i].lens[k]; others_req[i].lens[k] = partial_send[i]; } MPI_Type_hindexed(count[i], &(others_req[i].lens[start_pos[i]]), &(others_req[i].mem_ptrs[start_pos[i]]), MPI_BYTE, &send_type); /* absolute displacement; use MPI_BOTTOM in send */ MPI_Type_commit(&send_type); MPI_Isend(MPI_BOTTOM, 1, send_type, i, myrank+i+100*iter, fd->comm, requests+nprocs_recv+j); MPI_Type_free(&send_type); if (partial_send[i]) others_req[i].lens[k] = tmp; j++; } } statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \ sizeof(MPI_Status)); /* +1 to avoid a 0-size malloc */ /* wait on the receives */ if (nprocs_recv) {#ifdef NEEDS_MPI_TEST j = 0; while (!j) MPI_Testall(nprocs_recv, requests, &j, statuses);#else MPI_Waitall(nprocs_recv, requests, statuses);#endif /* if noncontiguous, to the copies from the recv buffers */ if (!buftype_is_contig) ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf, offset_list, len_list, recv_size, requests, statuses, recd_from_proc, nprocs, contig_access_count, min_st_offset, fd_size, fd_start, fd_end, buftype_extent); } /* wait on the sends*/ MPI_Waitall(nprocs_send, requests+nprocs_recv, statuses+nprocs_recv); ADIOI_Free(statuses); ADIOI_Free(requests); if (!buftype_is_contig) { for (i=0; i < nprocs; i++) if (recv_size[i]) ADIOI_Free(recv_buf[i]); ADIOI_Free(recv_buf); }}#define ADIOI_BUF_INCR \{ \ while (buf_incr) { \ size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \ user_buf_idx += size_in_buf; \ flat_buf_sz -= size_in_buf; \ if (!flat_buf_sz) { \ if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \ else { \ flat_buf_idx = 0; \ n_buftypes++; \ } \ user_buf_idx = flat_buf->indices[flat_buf_idx] + \ n_buftypes*buftype_extent; \ flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \ } \ buf_incr -= size_in_buf; \ } \}#define ADIOI_BUF_COPY \{ \ while (size) { \ size_in_buf = ADIOI_MIN(size, flat_buf_sz); \ memcpy(((char *) buf) + user_buf_idx, \ &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \ recv_buf_idx[p] += size_in_buf; \ user_buf_idx += size_in_buf; \ flat_buf_sz -= size_in_buf; \ if (!flat_buf_sz) { \ if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \ else { \ flat_buf_idx = 0; \ n_buftypes++; \ } \ user_buf_idx = flat_buf->indices[flat_buf_idx] + \ n_buftypes*buftype_extent; \ flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \ } \ size -= size_in_buf; \ buf_incr -= size_in_buf; \ } \ ADIOI_BUF_INCR \}static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node *flat_buf, char **recv_buf, ADIO_Offset *offset_list, int *len_list, int *recv_size, MPI_Request *requests, MPI_Status *statuses, int *recd_from_proc, int nprocs, int contig_access_count, ADIO_Offset min_st_offset, ADIO_Offset fd_size, ADIO_Offset *fd_start, ADIO_Offset *fd_end, MPI_Aint buftype_extent){/* this function is only called if buftype is not contig */ int i, p, flat_buf_idx, size, buf_incr; int flat_buf_sz, size_in_buf, n_buftypes; ADIO_Offset off, len, rem_len, user_buf_idx; int *curr_from_proc, *done_from_proc, *recv_buf_idx; ADIOI_UNREFERENCED_ARG(requests); ADIOI_UNREFERENCED_ARG(statuses);/* curr_from_proc[p] = amount of data recd from proc. p that has already been accounted for so far done_from_proc[p] = amount of data already recd from proc. p and filled into user buffer in previous iterations user_buf_idx = current location in user buffer recv_buf_idx[p] = current location in recv_buf of proc. p */ curr_from_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int)); done_from_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int)); recv_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int)); for (i=0; i < nprocs; i++) { recv_buf_idx[i] = curr_from_proc[i] = 0; done_from_proc[i] = recd_from_proc[i]; } user_buf_idx = flat_buf->indices[0]; flat_buf_idx = 0; n_buftypes = 0; flat_buf_sz = flat_buf->blocklens[0]; /* flat_buf_idx = current index into flattened buftype flat_buf_sz = size of current contiguous component in flattened buf */ for (i=0; i<contig_access_count; i++) { off = offset_list[i]; rem_len = (ADIO_Offset) len_list[i]; /* this request may span the file domains of more than one process */ while (rem_len != 0) { len = rem_len; /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no * longer than the single region that processor "p" is responsible * for. */ p = ADIOI_Calc_aggregator(fd, off, min_st_offset, &len, fd_size, fd_start, fd_end); if (recv_buf_idx[p] < recv_size[p]) { if (curr_from_proc[p]+len > done_from_proc[p]) { if (done_from_proc[p] > curr_from_proc[p]) { size = (int)ADIOI_MIN(curr_from_proc[p] + len - done_from_proc[p], recv_size[p]-recv_buf_idx[p]); buf_incr = done_from_proc[p] - curr_from_proc[p]; ADIOI_BUF_INCR buf_incr = (int)(curr_from_proc[p]+len-done_from_proc[p]); curr_from_proc[p] = done_from_proc[p] + size; ADIOI_BUF_COPY } else { size = (int)ADIOI_MIN(len,recv_size[p]-recv_buf_idx[p]); buf_incr = (int)len; curr_from_proc[p] += size; ADIOI_BUF_COPY } } else { curr_from_proc[p] += (int)len; buf_incr = (int)len; ADIOI_BUF_INCR } } else { buf_incr = (int)len; ADIOI_BUF_INCR } off += len; rem_len -= len; } } for (i=0; i < nprocs; i++) if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i]; ADIOI_Free(curr_from_proc); ADIOI_Free(done_from_proc); ADIOI_Free(recv_buf_idx);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -