⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ad_write_coll.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 3 页
字号:
    curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));     /* its use is explained below. calloc initializes to 0. */    count = (int *) ADIOI_Malloc(nprocs*sizeof(int));    /* to store count of how many off-len pairs per proc are satisfied       in an iteration. */    partial_recv = (int *) ADIOI_Calloc(nprocs, sizeof(int));    /* if only a portion of the last off-len pair is recd. from a process       in a particular iteration, the length recd. is stored here.       calloc initializes to 0. */    send_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));    /* total size of data to be sent to each proc. in an iteration.       Of size nprocs so that I can use MPI_Alltoall later. */    recv_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));    /* total size of data to be recd. from each proc. in an iteration.*/    sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));    /* amount of data sent to each proc so far. Used in       ADIOI_Fill_send_buffer. initialized to 0 here. */    send_buf_idx = (int *) ADIOI_Malloc(nprocs*sizeof(int));    curr_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));    done_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));    /* Above three are used in ADIOI_Fill_send_buffer*/    start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));    /* used to store the starting value of curr_offlen_ptr[i] in        this iteration */    ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);    if (!buftype_is_contig) {	ADIOI_Flatten_datatype(datatype);	flat_buf = ADIOI_Flatlist;        while (flat_buf->type != datatype) flat_buf = flat_buf->next;    }    MPI_Type_extent(datatype, &buftype_extent);/* I need to check if there are any outstanding nonblocking writes to   the file, which could potentially interfere with the writes taking   place in this collective write call. Since this is not likely to be   common, let me do the simplest thing possible here: Each process   completes all pending nonblocking operations before completing. */    /*ADIOI_Complete_async(error_code);    if (*error_code != MPI_SUCCESS) return;    MPI_Barrier(fd->comm);    */    done = 0;    off = st_loc;    for (m=0; m < ntimes; m++) {       /* go through all others_req and check which will be satisfied          by the current write */       /* Note that MPI guarantees that displacements in filetypes are in           monotonically nondecreasing order and that, for writes, the	  filetypes cannot specify overlapping regions in the file. This	  simplifies implementation a bit compared to reads. */          /* off = start offset in the file for the data to be written in                    this iteration              size = size of data written (bytes) corresponding to off             req_off = off in file for a particular contiguous request                        minus what was satisfied in previous iteration             req_size = size corresponding to req_off */	/* first calculate what should be communicated */	for (i=0; i < nprocs; i++) count[i] = recv_size[i] = 0;	size = (int) (ADIOI_MIN(coll_bufsize, end_loc-st_loc+1-done)); 	for (i=0; i < nprocs; i++) {	    if (others_req[i].count) {		start_pos[i] = curr_offlen_ptr[i];		for (j=curr_offlen_ptr[i]; j<others_req[i].count; j++) {		    if (partial_recv[i]) {			/* this request may have been partially			   satisfied in the previous iteration. */			req_off = others_req[i].offsets[j] +			    partial_recv[i];                         req_len = others_req[i].lens[j] -			    partial_recv[i];			partial_recv[i] = 0;			/* modify the off-len pair to reflect this change */			others_req[i].offsets[j] = req_off;			others_req[i].lens[j] = req_len;		    }		    else {			req_off = others_req[i].offsets[j];                        req_len = others_req[i].lens[j];		    }		    if (req_off < off + size) {			count[i]++;			MPI_Address(write_buf+req_off-off,                                &(others_req[i].mem_ptrs[j]));			recv_size[i] += (int)(ADIOI_MIN(off + (ADIO_Offset)size - 						  req_off, req_len));			if (off+size-req_off < req_len)			{			    partial_recv[i] = (int) (off + size - req_off);			    /* --BEGIN ERROR HANDLING-- */			    if ((j+1 < others_req[i].count) &&                                  (others_req[i].offsets[j+1] < off+size))			    { 				*error_code = MPIO_Err_create_code(MPI_SUCCESS,								   MPIR_ERR_RECOVERABLE,								   myname,								   __LINE__,								   MPI_ERR_ARG,								   "Filetype specifies overlapping write regions (which is illegal according to the MPI-2 specification)", 0);				/* allow to continue since additional				 * communication might have to occur				 */			    }			    /* --END ERROR HANDLING-- */			    break;			}		    }		    else break;		}		curr_offlen_ptr[i] = j;	    }	}		ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,                             len_list, send_size, recv_size, off, size, count,                             start_pos, partial_recv,                             sent_to_proc, nprocs, myrank, 			    buftype_is_contig, contig_access_count,			    min_st_offset, fd_size, fd_start, fd_end,			    others_req, send_buf_idx, curr_to_proc,                            done_to_proc, &hole, m, buftype_extent, buf_idx,			    error_code);         if (*error_code != MPI_SUCCESS) return;	flag = 0;	for (i=0; i<nprocs; i++)	    if (count[i]) flag = 1;	if (flag) {	    ADIO_WriteContig(fd, write_buf, size, MPI_BYTE, ADIO_EXPLICIT_OFFSET,                         off, &status, error_code);	    if (*error_code != MPI_SUCCESS) return;	}	off += size;	done += size;    }    for (i=0; i<nprocs; i++) count[i] = recv_size[i] = 0;    for (m=ntimes; m<max_ntimes; m++) {	/* nothing to recv, but check for send. */	ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,                             len_list, send_size, recv_size, off, size, count,                             start_pos, partial_recv,                             sent_to_proc, nprocs, myrank, 			    buftype_is_contig, contig_access_count,			    min_st_offset, fd_size, fd_start, fd_end,			    others_req, send_buf_idx,                             curr_to_proc, done_to_proc, &hole, m,                             buftype_extent, buf_idx, error_code);         if (*error_code != MPI_SUCCESS) return;    }    if (ntimes) ADIOI_Free(write_buf);    ADIOI_Free(curr_offlen_ptr);    ADIOI_Free(count);    ADIOI_Free(partial_recv);    ADIOI_Free(send_size);    ADIOI_Free(recv_size);    ADIOI_Free(sent_to_proc);    ADIOI_Free(start_pos);    ADIOI_Free(send_buf_idx);    ADIOI_Free(curr_to_proc);    ADIOI_Free(done_to_proc);}/* Sets error_code to MPI_SUCCESS if successful, or creates an error code * in the case of error. */static void ADIOI_W_Exchange_data(ADIO_File fd, void *buf, char *write_buf,				  ADIOI_Flatlist_node *flat_buf, ADIO_Offset 				  *offset_list, int *len_list, int *send_size, 				  int *recv_size, ADIO_Offset off, int size,				  int *count, int *start_pos,				  int *partial_recv,				  int *sent_to_proc, int nprocs, 				  int myrank, int				  buftype_is_contig, int contig_access_count,				  ADIO_Offset min_st_offset,				  ADIO_Offset fd_size,				  ADIO_Offset *fd_start, ADIO_Offset *fd_end, 				  ADIOI_Access *others_req, 				  int *send_buf_idx, int *curr_to_proc,				  int *done_to_proc, int *hole, int iter, 				  MPI_Aint buftype_extent, int *buf_idx,				  int *error_code){    int i, j, k, *tmp_len, nprocs_recv, nprocs_send, err;    char **send_buf = NULL;     MPI_Request *requests, *send_req;    MPI_Datatype *recv_types;    MPI_Status *statuses, status;    int *srt_len, sum, sum_recv;    ADIO_Offset *srt_off;    static char myname[] = "ADIOI_W_EXCHANGE_DATA";/* exchange recv_size info so that each process knows how much to   send to whom. */    MPI_Alltoall(recv_size, 1, MPI_INT, send_size, 1, MPI_INT, fd->comm);    /* create derived datatypes for recv */    nprocs_recv = 0;    for (i=0; i<nprocs; i++) if (recv_size[i]) nprocs_recv++;    recv_types = (MPI_Datatype *)	ADIOI_Malloc((nprocs_recv+1)*sizeof(MPI_Datatype)); /* +1 to avoid a 0-size malloc */    tmp_len = (int *) ADIOI_Malloc(nprocs*sizeof(int));    j = 0;    for (i=0; i<nprocs; i++) {	if (recv_size[i]) {/* take care if the last off-len pair is a partial recv */	    if (partial_recv[i]) {		k = start_pos[i] + count[i] - 1;		tmp_len[i] = others_req[i].lens[k];		others_req[i].lens[k] = partial_recv[i];	    }	    MPI_Type_hindexed(count[i],                  &(others_req[i].lens[start_pos[i]]),	             &(others_req[i].mem_ptrs[start_pos[i]]), 			 MPI_BYTE, recv_types+j);	    /* absolute displacements; use MPI_BOTTOM in recv */	    MPI_Type_commit(recv_types+j);	    j++;	}    }    /* To avoid a read-modify-write, check if there are holes in the        data to be written. For this, merge the (sorted) offset lists       others_req using a heap-merge. */    sum = 0;    for (i=0; i<nprocs; i++) sum += count[i];    srt_off = (ADIO_Offset *) ADIOI_Malloc((sum+1)*sizeof(ADIO_Offset));    srt_len = (int *) ADIOI_Malloc((sum+1)*sizeof(int));    /* +1 to avoid a 0-size malloc */    ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,                     nprocs, nprocs_recv, sum);/* for partial recvs, restore original lengths */    for (i=0; i<nprocs; i++)         if (partial_recv[i]) {            k = start_pos[i] + count[i] - 1;            others_req[i].lens[k] = tmp_len[i];        }    ADIOI_Free(tmp_len);/* check if there are any holes */    *hole = 0;    for (i=0; i<sum-1; i++)	if (srt_off[i]+srt_len[i] < srt_off[i+1]) {	    *hole = 1;	    break;	}    /* In some cases (see John Bent ROMIO REQ # 835), an odd interaction     * between aggregation, nominally contiguous regions, and cb_buffer_size     * should be handled with a read-modify-write (otherwise we will write out     * more data than we receive from everyone else (inclusive), so override     * hole detection     */    if (*hole == 0) {        sum_recv=0;        for (i=0; i<nprocs; i++) {	   sum_recv += recv_size[i];	   sum_recv += partial_recv[i];        }        if (size > sum_recv) *hole = 1;    }    ADIOI_Free(srt_off);    ADIOI_Free(srt_len);    if (nprocs_recv) {	if (*hole) {	    ADIO_ReadContig(fd, write_buf, size, MPI_BYTE, 			    ADIO_EXPLICIT_OFFSET, off, &status, &err);	    /* --BEGIN ERROR HANDLING-- */	    if (err != MPI_SUCCESS) {		*error_code = MPIO_Err_create_code(err,						   MPIR_ERR_RECOVERABLE, myname,						   __LINE__, MPI_ERR_IO,						   "**ioRMWrdwr", 0);		return;	    } 	    /* --END ERROR HANDLING-- */	}    }    nprocs_send = 0;    for (i=0; i < nprocs; i++) if (send_size[i]) nprocs_send++;    if (fd->atomicity) {        /* bug fix from Wei-keng Liao and Kenin Coloma */        requests = (MPI_Request *)	    ADIOI_Malloc((nprocs_send+1)*sizeof(MPI_Request));         send_req = requests;    }    else {        requests = (MPI_Request *) 	            ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));         /* +1 to avoid a 0-size malloc */        /* post receives */        j = 0;        for (i=0; i<nprocs; i++) {            if (recv_size[i]) {                MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,                          fd->comm, requests+j);                j++;            }        }	send_req = requests + nprocs_recv;    }/* post sends. if buftype_is_contig, data can be directly sent from   user buf at location given by buf_idx. else use send_buf. */    if (buftype_is_contig) {	j = 0;	for (i=0; i < nprocs; i++) 	    if (send_size[i]) {		MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],   		            MPI_BYTE, i,  myrank+i+100*iter, fd->comm,                                   send_req+j);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -