⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ad_read_coll.c

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 C
📖 第 1 页 / 共 3 页
字号:
	    if (*error_code != MPI_SUCCESS) return;	}		for_curr_iter = for_next_iter;	#ifdef PROFILE        MPE_Log_event(7, 0, "start communication");#endif	ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,			    send_size, recv_size, count,        			    start_pos, partial_send, recd_from_proc, nprocs,			    myrank, 			    buftype_is_contig, contig_access_count,			    min_st_offset, fd_size, fd_start, fd_end,			    others_req,                             m, buftype_extent, buf_idx); #ifdef PROFILE        MPE_Log_event(8, 0, "end communication");#endif	if (for_next_iter) {	    tmp_buf = (char *) ADIOI_Malloc(for_next_iter);	    memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter);	    ADIOI_Free(read_buf);	    read_buf = (char *) ADIOI_Malloc(for_next_iter+coll_bufsize);	    memcpy(read_buf, tmp_buf, for_next_iter);	    ADIOI_Free(tmp_buf);	}	off += size;	done += size;    }    for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;#ifdef PROFILE        MPE_Log_event(7, 0, "start communication");#endif    for (m=ntimes; m<max_ntimes; m++) /* nothing to send, but check for recv. */	ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,			    send_size, recv_size, count, 			    start_pos, partial_send, recd_from_proc, nprocs,			    myrank, 			    buftype_is_contig, contig_access_count,			    min_st_offset, fd_size, fd_start, fd_end,			    others_req, m,                            buftype_extent, buf_idx); #ifdef PROFILE        MPE_Log_event(8, 0, "end communication");#endif    if (ntimes) ADIOI_Free(read_buf);    ADIOI_Free(curr_offlen_ptr);    ADIOI_Free(count);    ADIOI_Free(partial_send);    ADIOI_Free(send_size);    ADIOI_Free(recv_size);    ADIOI_Free(recd_from_proc);    ADIOI_Free(start_pos);}static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node			 *flat_buf, ADIO_Offset *offset_list, int                         *len_list, int *send_size, int *recv_size,			 int *count, int *start_pos, int *partial_send, 			 int *recd_from_proc, int nprocs, 			 int myrank, int			 buftype_is_contig, int contig_access_count,			 ADIO_Offset min_st_offset, ADIO_Offset fd_size,			 ADIO_Offset *fd_start, ADIO_Offset *fd_end, 			 ADIOI_Access *others_req,                          int iter, MPI_Aint buftype_extent, int *buf_idx){    int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;    char **recv_buf = NULL;     MPI_Request *requests;    MPI_Datatype send_type;    MPI_Status *statuses;/* exchange send_size info so that each process knows how much to   receive from whom and how much memory to allocate. */    MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);    nprocs_recv = 0;    for (i=0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++;    nprocs_send = 0;    for (i=0; i<nprocs; i++) if (send_size[i]) nprocs_send++;    requests = (MPI_Request *)	ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));/* +1 to avoid a 0-size malloc *//* post recvs. if buftype_is_contig, data can be directly recd. into   user buf at location given by buf_idx. else use recv_buf. */    if (buftype_is_contig) {	j = 0;	for (i=0; i < nprocs; i++) 	    if (recv_size[i]) {		MPI_Irecv(((char *) buf) + buf_idx[i], recv_size[i], 		  MPI_BYTE, i, myrank+i+100*iter, fd->comm, requests+j);		j++;		buf_idx[i] += recv_size[i];	    }    }    else {/* allocate memory for recv_buf and post receives */	recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char*));	for (i=0; i < nprocs; i++) 	    if (recv_size[i]) recv_buf[i] =                                   (char *) ADIOI_Malloc(recv_size[i]);	    j = 0;	    for (i=0; i < nprocs; i++) 		if (recv_size[i]) {		    MPI_Irecv(recv_buf[i], recv_size[i], MPI_BYTE, i, 			      myrank+i+100*iter, fd->comm, requests+j);		    j++;		    /* FPRINTF(stderr, "node %d, recv_size %d, tag %d \n", 		       myrank, recv_size[i], myrank+i+100*iter); */		}    }/* create derived datatypes and send data */    j = 0;    for (i=0; i<nprocs; i++) {	if (send_size[i]) {/* take care if the last off-len pair is a partial send */	    if (partial_send[i]) {		k = start_pos[i] + count[i] - 1;		tmp = others_req[i].lens[k];		others_req[i].lens[k] = partial_send[i];	    }	    MPI_Type_hindexed(count[i],                  &(others_req[i].lens[start_pos[i]]),	            &(others_req[i].mem_ptrs[start_pos[i]]), 			 MPI_BYTE, &send_type);	    /* absolute displacement; use MPI_BOTTOM in send */	    MPI_Type_commit(&send_type);	    MPI_Isend(MPI_BOTTOM, 1, send_type, i, myrank+i+100*iter,		      fd->comm, requests+nprocs_recv+j);	    MPI_Type_free(&send_type);	    if (partial_send[i]) others_req[i].lens[k] = tmp;	    j++;	}    }    statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \                                     sizeof(MPI_Status));      /* +1 to avoid a 0-size malloc */    /* wait on the receives */    if (nprocs_recv) {#ifdef NEEDS_MPI_TEST	j = 0;	while (!j) MPI_Testall(nprocs_recv, requests, &j, statuses);#else	MPI_Waitall(nprocs_recv, requests, statuses);#endif	/* if noncontiguous, to the copies from the recv buffers */	if (!buftype_is_contig) 	    ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,				   offset_list, len_list, recv_size, 				   requests, statuses, recd_from_proc, 				   nprocs, contig_access_count,				   min_st_offset, fd_size, fd_start, fd_end,				   buftype_extent);    }    /* wait on the sends*/    MPI_Waitall(nprocs_send, requests+nprocs_recv, statuses+nprocs_recv);    ADIOI_Free(statuses);    ADIOI_Free(requests);    if (!buftype_is_contig) {	for (i=0; i < nprocs; i++) 	    if (recv_size[i]) ADIOI_Free(recv_buf[i]);	ADIOI_Free(recv_buf);    }}#define ADIOI_BUF_INCR \{ \    while (buf_incr) { \	size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \	user_buf_idx += size_in_buf; \	flat_buf_sz -= size_in_buf; \	if (!flat_buf_sz) { \            if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \            else { \                flat_buf_idx = 0; \                n_buftypes++; \            } \            user_buf_idx = flat_buf->indices[flat_buf_idx] + \                              n_buftypes*buftype_extent; \	    flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \	} \	buf_incr -= size_in_buf; \    } \}#define ADIOI_BUF_COPY \{ \    while (size) { \	size_in_buf = ADIOI_MIN(size, flat_buf_sz); \	memcpy(((char *) buf) + user_buf_idx, \	       &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \	recv_buf_idx[p] += size_in_buf; \	user_buf_idx += size_in_buf; \	flat_buf_sz -= size_in_buf; \	if (!flat_buf_sz) { \            if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \            else { \                flat_buf_idx = 0; \                n_buftypes++; \            } \            user_buf_idx = flat_buf->indices[flat_buf_idx] + \                              n_buftypes*buftype_extent; \	    flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \	} \	size -= size_in_buf; \	buf_incr -= size_in_buf; \    } \    ADIOI_BUF_INCR \}static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node				   *flat_buf, char **recv_buf, ADIO_Offset 				   *offset_list, int *len_list, 				   int *recv_size, 				   MPI_Request *requests, MPI_Status *statuses,				   int *recd_from_proc, int nprocs,				   int contig_access_count, 				   ADIO_Offset min_st_offset, 				   ADIO_Offset fd_size, ADIO_Offset *fd_start, 				   ADIO_Offset *fd_end,				   MPI_Aint buftype_extent){/* this function is only called if buftype is not contig */    int i, p, flat_buf_idx, size, buf_incr;    int flat_buf_sz, size_in_buf, n_buftypes;    ADIO_Offset off, len, rem_len, user_buf_idx;    int *curr_from_proc, *done_from_proc, *recv_buf_idx;/*  curr_from_proc[p] = amount of data recd from proc. p that has already                        been accounted for so far    done_from_proc[p] = amount of data already recd from proc. p and                         filled into user buffer in previous iterations    user_buf_idx = current location in user buffer     recv_buf_idx[p] = current location in recv_buf of proc. p  */    curr_from_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));    done_from_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));    recv_buf_idx   = (int *) ADIOI_Malloc(nprocs * sizeof(int));    for (i=0; i < nprocs; i++) {	recv_buf_idx[i] = curr_from_proc[i] = 0;	done_from_proc[i] = recd_from_proc[i];    }    user_buf_idx = flat_buf->indices[0];    flat_buf_idx = 0;    n_buftypes = 0;    flat_buf_sz = flat_buf->blocklens[0];    /* flat_buf_idx = current index into flattened buftype       flat_buf_sz = size of current contiguous component in                 flattened buf */    for (i=0; i<contig_access_count; i++) { 	off     = offset_list[i];	rem_len = (ADIO_Offset) len_list[i];	/* this request may span the file domains of more than one process */	while (rem_len != 0) {	    len = rem_len;	    /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no	     * longer than the single region that processor "p" is responsible	     * for.	     */	    p = ADIOI_Calc_aggregator(fd,				      off,				      min_st_offset,				      &len,				      fd_size,				      fd_start,				      fd_end);	    if (recv_buf_idx[p] < recv_size[p]) {		if (curr_from_proc[p]+len > done_from_proc[p]) {		    if (done_from_proc[p] > curr_from_proc[p]) {			size = (int)ADIOI_MIN(curr_from_proc[p] + len - 			      done_from_proc[p], recv_size[p]-recv_buf_idx[p]);			buf_incr = done_from_proc[p] - curr_from_proc[p];			ADIOI_BUF_INCR			buf_incr = (int)(curr_from_proc[p]+len-done_from_proc[p]);			curr_from_proc[p] = done_from_proc[p] + size;			ADIOI_BUF_COPY		    }		    else {			size = (int)ADIOI_MIN(len,recv_size[p]-recv_buf_idx[p]);			buf_incr = (int)len;			curr_from_proc[p] += size;			ADIOI_BUF_COPY		    }		}		else {		    curr_from_proc[p] += (int)len;		    buf_incr = (int)len;		    ADIOI_BUF_INCR		}	    }	    else {		buf_incr = (int)len;		ADIOI_BUF_INCR	    }	    off     += len;	    rem_len -= len;	}    }    for (i=0; i < nprocs; i++) 	if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];    ADIOI_Free(curr_from_proc);    ADIOI_Free(done_from_proc);    ADIOI_Free(recv_buf_idx);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -