📄 ad_bgl_wrcoll.c
字号:
Function calls are too expensive. */ k = i; while (1) { l = 2*(k+1) - 1; r = 2*(k+1); if ((l < heapsize) && (*(a[l].off_list) < *(a[k].off_list))) smallest = l; else smallest = k; if ((r < heapsize) && (*(a[r].off_list) < *(a[smallest].off_list))) smallest = r; if (smallest != k) { tmp.off_list = a[k].off_list; tmp.len_list = a[k].len_list; tmp.nelem = a[k].nelem; a[k].off_list = a[smallest].off_list; a[k].len_list = a[smallest].len_list; a[k].nelem = a[smallest].nelem; a[smallest].off_list = tmp.off_list; a[smallest].len_list = tmp.len_list; a[smallest].nelem = tmp.nelem; k = smallest; } else break; } } for (i=0; i<total_elements; i++) { /* extract smallest element from heap, i.e. the root */ srt_off[i] = *(a[0].off_list); srt_len[i] = *(a[0].len_list); (a[0].nelem)--; if (!a[0].nelem) { a[0].off_list = a[heapsize-1].off_list; a[0].len_list = a[heapsize-1].len_list; a[0].nelem = a[heapsize-1].nelem; heapsize--; } else { (a[0].off_list)++; (a[0].len_list)++; } /* Heapify(a, 0, heapsize); */ k = 0; while (1) { l = 2*(k+1) - 1; r = 2*(k+1); if ((l < heapsize) && (*(a[l].off_list) < *(a[k].off_list))) smallest = l; else smallest = k; if ((r < heapsize) && (*(a[r].off_list) < *(a[smallest].off_list))) smallest = r; if (smallest != k) { tmp.off_list = a[k].off_list; tmp.len_list = a[k].len_list; tmp.nelem = a[k].nelem; a[k].off_list = a[smallest].off_list; a[k].len_list = a[smallest].len_list; a[k].nelem = a[smallest].nelem; a[smallest].off_list = tmp.off_list; a[smallest].len_list = tmp.len_list; a[smallest].nelem = tmp.nelem; k = smallest; } else break; } } ADIOI_Free(a);}static void ADIOI_W_Exchange_data_alltoallv( ADIO_File fd, void *buf, char *write_buf, /* 1 */ ADIOI_Flatlist_node *flat_buf, ADIO_Offset *offset_list, int *len_list, int *send_size, int *recv_size, ADIO_Offset off, int size, /* 2 */ int *count, int *start_pos, int *partial_recv, int *sent_to_proc, int nprocs, int myrank, int buftype_is_contig, int contig_access_count, ADIO_Offset min_st_offset, ADIO_Offset fd_size, ADIO_Offset *fd_start, ADIO_Offset *fd_end, ADIOI_Access *others_req, int *send_buf_idx, int *curr_to_proc, /* 3 */ int *done_to_proc, int *hole, /* 4 */ int iter, MPI_Aint buftype_extent, int *buf_idx, int *error_code){ int i, j, k=0, tmp=0, nprocs_recv, nprocs_send, erri, *tmp_len, err; char **send_buf = NULL; MPI_Request *requests, *send_req; MPI_Datatype recv_type; MPI_Status *statuses, status; int rtail, stail; char *sbuf_ptr, *to_ptr; int len; int *sdispls, *rdispls; char *all_recv_buf, *all_send_buf; int *srt_len, sum; ADIO_Offset *srt_off; static char myname[] = "ADIOI_W_EXCHANGE_DATA"; /* exchange recv_size info so that each process knows how much to send to whom. */ MPI_Alltoall(recv_size, 1, MPI_INT, send_size, 1, MPI_INT, fd->comm); nprocs_recv = 0; for (i=0; i<nprocs; i++) if (recv_size[i]) { nprocs_recv++; } nprocs_send = 0; for (i=0; i<nprocs; i++) if (send_size[i]) { nprocs_send++; } /* receiver side data structures */ rdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) ); rtail = 0; for (i=0; i<nprocs; i++) { rdispls[i] = rtail; rtail += recv_size[i]; } /* data buffer */ all_recv_buf = (char *) ADIOI_Malloc( rtail ); /* sender side data structures */ sdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) ); stail = 0; for (i=0; i<nprocs; i++) { sdispls[i] = stail; stail += send_size[i]; } /* data buffer */ all_send_buf = (char *) ADIOI_Malloc( stail ); if (buftype_is_contig) { for (i=0; i<nprocs; i++) { if (send_size[i]) { sbuf_ptr = all_send_buf + sdispls[i]; memcpy( sbuf_ptr, buf + buf_idx[i], send_size[i] ); buf_idx[i] += send_size[i]; } } } else { send_buf = (char **) ADIOI_Malloc( nprocs * sizeof(char *) ); for (i=0; i<nprocs; i++) send_buf[i] = all_send_buf + sdispls[i]; ADIOI_Fill_send_buffer_nosend(fd, buf, flat_buf, send_buf, offset_list, len_list, send_size, send_req, sent_to_proc, nprocs, myrank, contig_access_count, min_st_offset, fd_size, fd_start, fd_end, send_buf_idx, curr_to_proc, done_to_proc, iter, buftype_extent); } /* alltoallv */ MPI_Alltoallv( all_send_buf, send_size, sdispls, MPI_BYTE, all_recv_buf, recv_size, rdispls, MPI_BYTE, fd->comm ); /* data sieving pre-read */ /* To avoid a read-modify-write, check if there are holes in the data to be written. For this, merge the (sorted) offset lists others_req using a heap-merge. */ sum = 0; for (i=0; i<nprocs; i++) sum += count[i]; srt_off = (ADIO_Offset *) ADIOI_Malloc((sum+1)*sizeof(ADIO_Offset)); srt_len = (int *) ADIOI_Malloc((sum+1)*sizeof(int)); ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos, nprocs, nprocs_recv, sum); /* check if there are any holes */ *hole = 0; /* See if there are holes before the first request or after the last request*/ if((srt_off[0] > off) || ((srt_off[sum-1] + srt_len[sum-1]) < (off + size))) { *hole = 1; } else /* See if there are holes between the requests, if there are more than one */ for (i=0; i<sum-1; i++) if (srt_off[i]+srt_len[i] < srt_off[i+1]) { *hole = 1; break; } ADIOI_Free(srt_off); ADIOI_Free(srt_len); if (nprocs_recv) { if (*hole) { ADIO_ReadContig(fd, write_buf, size, MPI_BYTE, ADIO_EXPLICIT_OFFSET, off, &status, &err); /* --BEGIN ERROR HANDLING-- */ if (err != MPI_SUCCESS) { *error_code = MPIO_Err_create_code(err, MPIR_ERR_RECOVERABLE, myname, __LINE__, MPI_ERR_IO, "**ioRMWrdwr", 0); return; } /* --END ERROR HANDLING-- */ } } /* scater all_recv_buf into 4M cb_buffer */ tmp_len = (int *) ADIOI_Malloc(nprocs*sizeof(int)); for (i=0; i<nprocs; i++) { if (recv_size[i]) { if (partial_recv[i]) { k = start_pos[i] + count[i] - 1; tmp_len[i] = others_req[i].lens[k]; others_req[i].lens[k] = partial_recv[i]; } sbuf_ptr = all_recv_buf + rdispls[i]; for (j=0; j<count[i]; j++) { to_ptr = (char *)( others_req[i].mem_ptrs[ start_pos[i]+j ] ); len = others_req[i].lens[ start_pos[i]+j ] ; memcpy( to_ptr, sbuf_ptr, len ); sbuf_ptr += len; } /* restore */ if (partial_recv[i]) { k = start_pos[i] + count[i] - 1; others_req[i].lens[k] = tmp_len[i]; } } } ADIOI_Free( tmp_len ); ADIOI_Free( all_send_buf ); ADIOI_Free( all_recv_buf ); ADIOI_Free(sdispls); ADIOI_Free(rdispls); return; } static void ADIOI_Fill_send_buffer_nosend(ADIO_File fd, void *buf, ADIOI_Flatlist_node *flat_buf, char **send_buf, ADIO_Offset *offset_list, int *len_list, int *send_size, MPI_Request *requests, int *sent_to_proc, int nprocs, int myrank, int contig_access_count, ADIO_Offset min_st_offset, ADIO_Offset fd_size, ADIO_Offset *fd_start, ADIO_Offset *fd_end, int *send_buf_idx, int *curr_to_proc, int *done_to_proc, int iter, MPI_Aint buftype_extent){/* this function is only called if buftype is not contig */ int i, p, flat_buf_idx, size; int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes; ADIO_Offset off, len, rem_len, user_buf_idx;/* curr_to_proc[p] = amount of data sent to proc. p that has already been accounted for so far done_to_proc[p] = amount of data already sent to proc. p in previous iterations user_buf_idx = current location in user buffer send_buf_idx[p] = current location in send_buf of proc. p */ for (i=0; i < nprocs; i++) { send_buf_idx[i] = curr_to_proc[i] = 0; done_to_proc[i] = sent_to_proc[i]; } jj = 0; user_buf_idx = flat_buf->indices[0]; flat_buf_idx = 0; n_buftypes = 0; flat_buf_sz = flat_buf->blocklens[0]; /* flat_buf_idx = current index into flattened buftype flat_buf_sz = size of current contiguous component in flattened buf */ for (i=0; i<contig_access_count; i++) { off = offset_list[i]; rem_len = (ADIO_Offset) len_list[i]; /*this request may span the file domains of more than one process*/ while (rem_len != 0) { len = rem_len; /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no * longer than the single region that processor "p" is responsible * for. */ p = ADIOI_BGL_Calc_aggregator(fd, off, min_st_offset, &len, fd_size, fd_start, fd_end); if (send_buf_idx[p] < send_size[p]) { if (curr_to_proc[p]+len > done_to_proc[p]) { if (done_to_proc[p] > curr_to_proc[p]) { size = (int)ADIOI_MIN(curr_to_proc[p] + len - done_to_proc[p], send_size[p]-send_buf_idx[p]); buf_incr = done_to_proc[p] - curr_to_proc[p]; ADIOI_BUF_INCR buf_incr = (int)(curr_to_proc[p] + len - done_to_proc[p]); curr_to_proc[p] = done_to_proc[p] + size; ADIOI_BUF_COPY } else { size = (int)ADIOI_MIN(len,send_size[p]-send_buf_idx[p]); buf_incr = (int)len; curr_to_proc[p] += size; ADIOI_BUF_COPY } /* moved to alltoallv */ /* if (send_buf_idx[p] == send_size[p]) { MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p, myrank+p+100*iter, fd->comm, requests+jj); jj++; } */ } else { curr_to_proc[p] += (int)len; buf_incr = (int)len; ADIOI_BUF_INCR } } else { buf_incr = (int)len; ADIOI_BUF_INCR } off += len; rem_len -= len; } } for (i=0; i < nprocs; i++) if (send_size[i]) sent_to_proc[i] = curr_to_proc[i];}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -