📄 ad_bgl_wrcoll.c
字号:
tmp_len[i] = others_req[i].lens[k]; others_req[i].lens[k] = partial_recv[i]; } MPI_Type_hindexed(count[i], &(others_req[i].lens[start_pos[i]]), &(others_req[i].mem_ptrs[start_pos[i]]), MPI_BYTE, recv_types+j); /* absolute displacements; use MPI_BOTTOM in recv */ MPI_Type_commit(recv_types+j); j++; } } /* To avoid a read-modify-write, check if there are holes in the data to be written. For this, merge the (sorted) offset lists others_req using a heap-merge. */ sum = 0; for (i=0; i<nprocs; i++) sum += count[i]; srt_off = (ADIO_Offset *) ADIOI_Malloc((sum+1)*sizeof(ADIO_Offset)); srt_len = (int *) ADIOI_Malloc((sum+1)*sizeof(int)); /* +1 to avoid a 0-size malloc */ ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos, nprocs, nprocs_recv, sum);/* for partial recvs, restore original lengths */ for (i=0; i<nprocs; i++) if (partial_recv[i]) { k = start_pos[i] + count[i] - 1; others_req[i].lens[k] = tmp_len[i]; } ADIOI_Free(tmp_len);/* check if there are any holes */ *hole = 0; /* See if there are holes before the first request or after the last request*/ if((srt_off[0] > off) || ((srt_off[sum-1] + srt_len[sum-1]) < (off + size))) { *hole = 1; } else /* See if there are holes between the requests, if there are more than one */ for (i=0; i<sum-1; i++) if (srt_off[i]+srt_len[i] < srt_off[i+1]) { *hole = 1; break; } ADIOI_Free(srt_off); ADIOI_Free(srt_len); if (nprocs_recv) { if (*hole) { ADIO_ReadContig(fd, write_buf, size, MPI_BYTE, ADIO_EXPLICIT_OFFSET, off, &status, &err); /* --BEGIN ERROR HANDLING-- */ if (err != MPI_SUCCESS) { *error_code = MPIO_Err_create_code(err, MPIR_ERR_RECOVERABLE, myname, __LINE__, MPI_ERR_IO, "**ioRMWrdwr", 0); return; } /* --END ERROR HANDLING-- */ } } nprocs_send = 0; for (i=0; i < nprocs; i++) if (send_size[i]) nprocs_send++; if (fd->atomicity) { /* bug fix from Wei-keng Liao and Kenin Coloma */ requests = (MPI_Request *) ADIOI_Malloc((nprocs_send+1)*sizeof(MPI_Request)); send_req = requests; } else { requests = (MPI_Request *) ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request)); /* +1 to avoid a 0-size malloc */ /* post receives */ j = 0; for (i=0; i<nprocs; i++) { if (recv_size[i]) { MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter, fd->comm, requests+j); j++; } } send_req = requests + nprocs_recv; }/* post sends. if buftype_is_contig, data can be directly sent from user buf at location given by buf_idx. else use send_buf. */ if (buftype_is_contig) { j = 0; for (i=0; i < nprocs; i++) if (send_size[i]) { MPI_Isend(((char *) buf) + buf_idx[i], send_size[i], MPI_BYTE, i, myrank+i+100*iter, fd->comm, send_req+j); j++; buf_idx[i] += send_size[i]; } } else if (nprocs_send) { /* buftype is not contig */ send_buf = (char **) ADIOI_Malloc(nprocs*sizeof(char*)); for (i=0; i < nprocs; i++) if (send_size[i]) send_buf[i] = (char *) ADIOI_Malloc(send_size[i]); ADIOI_Fill_send_buffer(fd, buf, flat_buf, send_buf, offset_list, len_list, send_size, send_req, sent_to_proc, nprocs, myrank, contig_access_count, min_st_offset, fd_size, fd_start, fd_end, send_buf_idx, curr_to_proc, done_to_proc, iter, buftype_extent); /* the send is done in ADIOI_Fill_send_buffer */ } if (fd->atomicity) { /* bug fix from Wei-keng Liao and Kenin Coloma */ j = 0; for (i=0; i<nprocs; i++) { MPI_Status wkl_status; if (recv_size[i]) { MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter, fd->comm, &wkl_status); j++; } } } for (i=0; i<nprocs_recv; i++) MPI_Type_free(recv_types+i); ADIOI_Free(recv_types); if (fd->atomicity) { /* bug fix from Wei-keng Liao and Kenin Coloma */ statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+1) * \ sizeof(MPI_Status)); /* +1 to avoid a 0-size malloc */ } else { statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \ sizeof(MPI_Status)); /* +1 to avoid a 0-size malloc */ }#ifdef NEEDS_MPI_TEST i = 0; if (fd->atomicity) { /* bug fix from Wei-keng Liao and Kenin Coloma */ while (!i) MPI_Testall(nprocs_send, send_req, &i, statuses); } else { while (!i) MPI_Testall(nprocs_send+nprocs_recv, requests, &i, statuses); }#else if (fd->atomicity) /* bug fix from Wei-keng Liao and Kenin Coloma */ MPI_Waitall(nprocs_send, send_req, statuses); else MPI_Waitall(nprocs_send+nprocs_recv, requests, statuses);#endif ADIOI_Free(statuses); ADIOI_Free(requests); if (!buftype_is_contig && nprocs_send) { for (i=0; i < nprocs; i++) if (send_size[i]) ADIOI_Free(send_buf[i]); ADIOI_Free(send_buf); }}#define ADIOI_BUF_INCR \{ \ while (buf_incr) { \ size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \ user_buf_idx += size_in_buf; \ flat_buf_sz -= size_in_buf; \ if (!flat_buf_sz) { \ if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \ else { \ flat_buf_idx = 0; \ n_buftypes++; \ } \ user_buf_idx = flat_buf->indices[flat_buf_idx] + \ n_buftypes*buftype_extent; \ flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \ } \ buf_incr -= size_in_buf; \ } \}#define ADIOI_BUF_COPY \{ \ while (size) { \ size_in_buf = ADIOI_MIN(size, flat_buf_sz); \ memcpy(&(send_buf[p][send_buf_idx[p]]), \ ((char *) buf) + user_buf_idx, size_in_buf); \ send_buf_idx[p] += size_in_buf; \ user_buf_idx += size_in_buf; \ flat_buf_sz -= size_in_buf; \ if (!flat_buf_sz) { \ if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \ else { \ flat_buf_idx = 0; \ n_buftypes++; \ } \ user_buf_idx = flat_buf->indices[flat_buf_idx] + \ n_buftypes*buftype_extent; \ flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \ } \ size -= size_in_buf; \ buf_incr -= size_in_buf; \ } \ ADIOI_BUF_INCR \}static void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node *flat_buf, char **send_buf, ADIO_Offset *offset_list, int *len_list, int *send_size, MPI_Request *requests, int *sent_to_proc, int nprocs, int myrank, int contig_access_count, ADIO_Offset min_st_offset, ADIO_Offset fd_size, ADIO_Offset *fd_start, ADIO_Offset *fd_end, int *send_buf_idx, int *curr_to_proc, int *done_to_proc, int iter, MPI_Aint buftype_extent){/* this function is only called if buftype is not contig */ int i, p, flat_buf_idx, size; int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes; ADIO_Offset off, len, rem_len, user_buf_idx;/* curr_to_proc[p] = amount of data sent to proc. p that has already been accounted for so far done_to_proc[p] = amount of data already sent to proc. p in previous iterations user_buf_idx = current location in user buffer send_buf_idx[p] = current location in send_buf of proc. p */ for (i=0; i < nprocs; i++) { send_buf_idx[i] = curr_to_proc[i] = 0; done_to_proc[i] = sent_to_proc[i]; } jj = 0; user_buf_idx = flat_buf->indices[0]; flat_buf_idx = 0; n_buftypes = 0; flat_buf_sz = flat_buf->blocklens[0]; /* flat_buf_idx = current index into flattened buftype flat_buf_sz = size of current contiguous component in flattened buf */ for (i=0; i<contig_access_count; i++) { off = offset_list[i]; rem_len = (ADIO_Offset) len_list[i]; /*this request may span the file domains of more than one process*/ while (rem_len != 0) { len = rem_len; /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no * longer than the single region that processor "p" is responsible * for. */ p = ADIOI_BGL_Calc_aggregator(fd, off, min_st_offset, &len, fd_size, fd_start, fd_end); if (send_buf_idx[p] < send_size[p]) { if (curr_to_proc[p]+len > done_to_proc[p]) { if (done_to_proc[p] > curr_to_proc[p]) { size = (int)ADIOI_MIN(curr_to_proc[p] + len - done_to_proc[p], send_size[p]-send_buf_idx[p]); buf_incr = done_to_proc[p] - curr_to_proc[p]; ADIOI_BUF_INCR buf_incr = (int)(curr_to_proc[p] + len - done_to_proc[p]); curr_to_proc[p] = done_to_proc[p] + size; ADIOI_BUF_COPY } else { size = (int)ADIOI_MIN(len,send_size[p]-send_buf_idx[p]); buf_incr = (int)len; curr_to_proc[p] += size; ADIOI_BUF_COPY } if (send_buf_idx[p] == send_size[p]) { MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p, myrank+p+100*iter, fd->comm, requests+jj); jj++; } } else { curr_to_proc[p] += (int)len; buf_incr = (int)len; ADIOI_BUF_INCR } } else { buf_incr = (int)len; ADIOI_BUF_INCR } off += len; rem_len -= len; } } for (i=0; i < nprocs; i++) if (send_size[i]) sent_to_proc[i] = curr_to_proc[i];}static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, ADIO_Offset *srt_off, int *srt_len, int *start_pos, int nprocs, int nprocs_recv, int total_elements){ typedef struct { ADIO_Offset *off_list; int *len_list; int nelem; } heap_struct; heap_struct *a, tmp; int i, j, heapsize, l, r, k, smallest; a = (heap_struct *) ADIOI_Malloc((nprocs_recv+1)*sizeof(heap_struct)); j = 0; for (i=0; i<nprocs; i++) if (count[i]) { a[j].off_list = &(others_req[i].offsets[start_pos[i]]); a[j].len_list = &(others_req[i].lens[start_pos[i]]); a[j].nelem = count[i]; j++; } /* build a heap out of the first element from each list, with the smallest element of the heap at the root */ heapsize = nprocs_recv; for (i=heapsize/2 - 1; i>=0; i--) { /* Heapify(a, i, heapsize); Algorithm from Cormen et al. pg. 143 modified for a heap with smallest element at root. I have removed the recursion so that there are no function calls.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -