📄 ad_write_coll.c
字号:
j++; buf_idx[i] += send_size[i]; } } else if (nprocs_send) { /* buftype is not contig */ send_buf = (char **) ADIOI_Malloc(nprocs*sizeof(char*)); for (i=0; i < nprocs; i++) if (send_size[i]) send_buf[i] = (char *) ADIOI_Malloc(send_size[i]); ADIOI_Fill_send_buffer(fd, buf, flat_buf, send_buf, offset_list, len_list, send_size, requests+nprocs_recv, sent_to_proc, nprocs, myrank, contig_access_count, min_st_offset, fd_size, fd_start, fd_end, send_buf_idx, curr_to_proc, done_to_proc, iter, buftype_extent); /* the send is done in ADIOI_Fill_send_buffer */ } for (i=0; i<nprocs_recv; i++) MPI_Type_free(recv_types+i); ADIOI_Free(recv_types); statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \ sizeof(MPI_Status)); /* +1 to avoid a 0-size malloc */#ifdef NEEDS_MPI_TEST i = 0; while (!i) MPI_Testall(nprocs_send+nprocs_recv, requests, &i, statuses);#else MPI_Waitall(nprocs_send+nprocs_recv, requests, statuses);#endif ADIOI_Free(statuses); ADIOI_Free(requests); if (!buftype_is_contig && nprocs_send) { for (i=0; i < nprocs; i++) if (send_size[i]) ADIOI_Free(send_buf[i]); ADIOI_Free(send_buf); }}#define ADIOI_BUF_INCR \{ \ while (buf_incr) { \ size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \ user_buf_idx += size_in_buf; \ flat_buf_sz -= size_in_buf; \ if (!flat_buf_sz) { \ if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \ else { \ flat_buf_idx = 0; \ n_buftypes++; \ } \ user_buf_idx = flat_buf->indices[flat_buf_idx] + \ n_buftypes*buftype_extent; \ flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \ } \ buf_incr -= size_in_buf; \ } \}#define ADIOI_BUF_COPY \{ \ while (size) { \ size_in_buf = ADIOI_MIN(size, flat_buf_sz); \ memcpy(&(send_buf[p][send_buf_idx[p]]), \ ((char *) buf) + user_buf_idx, size_in_buf); \ send_buf_idx[p] += size_in_buf; \ user_buf_idx += size_in_buf; \ flat_buf_sz -= size_in_buf; \ if (!flat_buf_sz) { \ if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \ else { \ flat_buf_idx = 0; \ n_buftypes++; \ } \ user_buf_idx = flat_buf->indices[flat_buf_idx] + \ n_buftypes*buftype_extent; \ flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \ } \ size -= size_in_buf; \ buf_incr -= size_in_buf; \ } \ ADIOI_BUF_INCR \}static void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node *flat_buf, char **send_buf, ADIO_Offset *offset_list, int *len_list, int *send_size, MPI_Request *requests, int *sent_to_proc, int nprocs, int myrank, int contig_access_count, ADIO_Offset min_st_offset, ADIO_Offset fd_size, ADIO_Offset *fd_start, ADIO_Offset *fd_end, int *send_buf_idx, int *curr_to_proc, int *done_to_proc, int iter, MPI_Aint buftype_extent){/* this function is only called if buftype is not contig */ int i, p, flat_buf_idx, size; int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes; ADIO_Offset off, len, rem_len, user_buf_idx;/* curr_to_proc[p] = amount of data sent to proc. p that has already been accounted for so far done_to_proc[p] = amount of data already sent to proc. p in previous iterations user_buf_idx = current location in user buffer send_buf_idx[p] = current location in send_buf of proc. p */ for (i=0; i < nprocs; i++) { send_buf_idx[i] = curr_to_proc[i] = 0; done_to_proc[i] = sent_to_proc[i]; } jj = 0; user_buf_idx = flat_buf->indices[0]; flat_buf_idx = 0; n_buftypes = 0; flat_buf_sz = flat_buf->blocklens[0]; /* flat_buf_idx = current index into flattened buftype flat_buf_sz = size of current contiguous component in flattened buf */ for (i=0; i<contig_access_count; i++) { off = offset_list[i]; rem_len = (ADIO_Offset) len_list[i]; /*this request may span the file domains of more than one process*/ while (rem_len != 0) { len = rem_len; /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no * longer than the single region that processor "p" is responsible * for. */ p = ADIOI_Calc_aggregator(fd, off, min_st_offset, &len, fd_size, fd_start, fd_end); if (send_buf_idx[p] < send_size[p]) { if (curr_to_proc[p]+len > done_to_proc[p]) { if (done_to_proc[p] > curr_to_proc[p]) { size = (int)ADIOI_MIN(curr_to_proc[p] + len - done_to_proc[p], send_size[p]-send_buf_idx[p]); buf_incr = done_to_proc[p] - curr_to_proc[p]; ADIOI_BUF_INCR buf_incr = (int)(curr_to_proc[p] + len - done_to_proc[p]); curr_to_proc[p] = done_to_proc[p] + size; ADIOI_BUF_COPY } else { size = (int)ADIOI_MIN(len,send_size[p]-send_buf_idx[p]); buf_incr = (int)len; curr_to_proc[p] += size; ADIOI_BUF_COPY } if (send_buf_idx[p] == send_size[p]) { MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p, myrank+p+100*iter, fd->comm, requests+jj); jj++; } } else { curr_to_proc[p] += (int)len; buf_incr = (int)len; ADIOI_BUF_INCR } } else { buf_incr = (int)len; ADIOI_BUF_INCR } off += len; rem_len -= len; } } for (i=0; i < nprocs; i++) if (send_size[i]) sent_to_proc[i] = curr_to_proc[i];}static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, ADIO_Offset *srt_off, int *srt_len, int *start_pos, int nprocs, int nprocs_recv, int total_elements){ typedef struct { ADIO_Offset *off_list; int *len_list; int nelem; } heap_struct; heap_struct *a, tmp; int i, j, heapsize, l, r, k, smallest; a = (heap_struct *) ADIOI_Malloc((nprocs_recv+1)*sizeof(heap_struct)); j = 0; for (i=0; i<nprocs; i++) if (count[i]) { a[j].off_list = &(others_req[i].offsets[start_pos[i]]); a[j].len_list = &(others_req[i].lens[start_pos[i]]); a[j].nelem = count[i]; j++; } /* build a heap out of the first element from each list, with the smallest element of the heap at the root */ heapsize = nprocs_recv; for (i=heapsize/2 - 1; i>=0; i--) { /* Heapify(a, i, heapsize); Algorithm from Cormen et al. pg. 143 modified for a heap with smallest element at root. I have removed the recursion so that there are no function calls. Function calls are too expensive. */ k = i; while (1) { l = 2*(k+1) - 1; r = 2*(k+1); if ((l < heapsize) && (*(a[l].off_list) < *(a[k].off_list))) smallest = l; else smallest = k; if ((r < heapsize) && (*(a[r].off_list) < *(a[smallest].off_list))) smallest = r; if (smallest != k) { tmp.off_list = a[k].off_list; tmp.len_list = a[k].len_list; tmp.nelem = a[k].nelem; a[k].off_list = a[smallest].off_list; a[k].len_list = a[smallest].len_list; a[k].nelem = a[smallest].nelem; a[smallest].off_list = tmp.off_list; a[smallest].len_list = tmp.len_list; a[smallest].nelem = tmp.nelem; k = smallest; } else break; } } for (i=0; i<total_elements; i++) { /* extract smallest element from heap, i.e. the root */ srt_off[i] = *(a[0].off_list); srt_len[i] = *(a[0].len_list); (a[0].nelem)--; if (!a[0].nelem) { a[0].off_list = a[heapsize-1].off_list; a[0].len_list = a[heapsize-1].len_list; a[0].nelem = a[heapsize-1].nelem; heapsize--; } else { (a[0].off_list)++; (a[0].len_list)++; } /* Heapify(a, 0, heapsize); */ k = 0; while (1) { l = 2*(k+1) - 1; r = 2*(k+1); if ((l < heapsize) && (*(a[l].off_list) < *(a[k].off_list))) smallest = l; else smallest = k; if ((r < heapsize) && (*(a[r].off_list) < *(a[smallest].off_list))) smallest = r; if (smallest != k) { tmp.off_list = a[k].off_list; tmp.len_list = a[k].len_list; tmp.nelem = a[k].nelem; a[k].off_list = a[smallest].off_list; a[k].len_list = a[smallest].len_list; a[k].nelem = a[smallest].nelem; a[smallest].off_list = tmp.off_list; a[smallest].len_list = tmp.len_list; a[smallest].nelem = tmp.nelem; k = smallest; } else break; } } ADIOI_Free(a);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -