⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ad_read_coll.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 3 页
字号:
	    while (!flag) {		n_filetypes++;		for (i=0; i<flat_file->count; i++) {		    if (disp + flat_file->indices[i] + 			(ADIO_Offset) n_filetypes*filetype_extent + 			flat_file->blocklens[i] >= offset) 		    {			st_index = i;			frd_size = (int) (disp + flat_file->indices[i] + 			    (ADIO_Offset) n_filetypes*filetype_extent			        + flat_file->blocklens[i] - offset);			flag = 1;			break;		    }		}	    }	}	else {	    n_etypes_in_filetype = filetype_size/etype_size;	    n_filetypes = (int) (offset / n_etypes_in_filetype);	    etype_in_filetype = (int) (offset % n_etypes_in_filetype);	    size_in_filetype = etype_in_filetype * etype_size; 	    sum = 0;	    for (i=0; i<flat_file->count; i++) {		sum += flat_file->blocklens[i];		if (sum > size_in_filetype) {		    st_index = i;		    frd_size = sum - size_in_filetype;		    abs_off_in_filetype = flat_file->indices[i] +			size_in_filetype - (sum - flat_file->blocklens[i]);		    break;		}	    }	    /* abs. offset in bytes in the file */	    offset = disp + (ADIO_Offset) n_filetypes*filetype_extent + 		abs_off_in_filetype;	}         /* calculate how much space to allocate for offset_list, len_list */	old_frd_size = frd_size;	contig_access_count = i = 0;	j = st_index;	bufsize = buftype_size * bufcount;	frd_size = ADIOI_MIN(frd_size, bufsize);	while (i < bufsize) {	    if (frd_size) contig_access_count++;	    i += frd_size;	    j = (j + 1) % flat_file->count;	    frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i);	}        /* allocate space for offset_list and len_list */	*offset_list_ptr = (ADIO_Offset *)	         ADIOI_Malloc((contig_access_count+1)*sizeof(ADIO_Offset));  	*len_list_ptr = (int *) ADIOI_Malloc((contig_access_count+1)*sizeof(int));        /* +1 to avoid a 0-size malloc */	offset_list = *offset_list_ptr;	len_list = *len_list_ptr;      /* find start offset, end offset, and fill in offset_list and len_list */	*start_offset_ptr = offset; /* calculated above */	i = k = 0;	j = st_index;	off = offset;	frd_size = ADIOI_MIN(old_frd_size, bufsize);	while (i < bufsize) {	    if (frd_size) {		offset_list[k] = off;		len_list[k] = frd_size;		k++;	    }	    i += frd_size;	    end_offset = off + frd_size - 1;     /* Note: end_offset points to the last byte-offset that will be accessed.         e.g., if start_offset=0 and 100 bytes to be read, end_offset=99*/	    if (off + frd_size < disp + flat_file->indices[j] +		flat_file->blocklens[j] + 		(ADIO_Offset) n_filetypes*filetype_extent)	    {		off += frd_size;		/* did not reach end of contiguous block in filetype.		 * no more I/O needed. off is incremented by frd_size. 		 */	    }	    else {		if (j < (flat_file->count - 1)) j++;		else {		    /* hit end of flattened filetype; 		     * start at beginning again 		     */		    j = 0;		    n_filetypes++;		}		off = disp + flat_file->indices[j] + 		    (ADIO_Offset) n_filetypes*filetype_extent;		frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i);	    }	}	/* update file pointer */	if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = off;	*contig_access_count_ptr = contig_access_count;	*end_offset_ptr = end_offset;    }}static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype			 datatype, int nprocs,			 int myrank, ADIOI_Access			 *others_req, ADIO_Offset *offset_list,			 int *len_list, int contig_access_count, ADIO_Offset                         min_st_offset, ADIO_Offset fd_size,			 ADIO_Offset *fd_start, ADIO_Offset *fd_end,                         int *buf_idx, int *error_code){/* Read in sizes of no more than coll_bufsize, an info parameter.   Send data to appropriate processes.    Place recd. data in user buf.   The idea is to reduce the amount of extra memory required for   collective I/O. If all data were read all at once, which is much   easier, it would require temp space more than the size of user_buf,   which is often unacceptable. For example, to read a distributed   array from a file, where each local array is 8Mbytes, requiring   at least another 8Mbytes of temp space is unacceptable. */    int i, j, m, size, ntimes, max_ntimes, buftype_is_contig;    ADIO_Offset st_loc=-1, end_loc=-1, off, done, real_off, req_off;    char *read_buf = NULL, *tmp_buf;    int *curr_offlen_ptr, *count, *send_size, *recv_size;    int *partial_send, *recd_from_proc, *start_pos, for_next_iter;    int real_size, req_len, flag, for_curr_iter, rank;    MPI_Status status;    ADIOI_Flatlist_node *flat_buf=NULL;    MPI_Aint buftype_extent;    int coll_bufsize;    *error_code = MPI_SUCCESS;  /* changed below if error */    /* only I/O errors are currently reported */    /* calculate the number of reads of size coll_bufsize   to be done by each process and the max among all processes.   That gives the no. of communication phases as well.   coll_bufsize is obtained from the hints object. */    coll_bufsize = fd->hints->cb_buffer_size;    /* grab some initial values for st_loc and end_loc */    for (i=0; i < nprocs; i++) {	if (others_req[i].count) {	    st_loc = others_req[i].offsets[0];	    end_loc = others_req[i].offsets[0];	    break;	}    }    /* now find the real values */    for (i=0; i < nprocs; i++)	for (j=0; j<others_req[i].count; j++) {	    st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);	    end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]					  + others_req[i].lens[j] - 1));	}    /* calculate ntimes, the number of times this process must perform I/O     * operations in order to complete all the requests it has received.     * the need for multiple I/O operations comes from the restriction that     * we only use coll_bufsize bytes of memory for internal buffering.     */    if ((st_loc==-1) && (end_loc==-1)) {	/* this process does no I/O. */	ntimes = 0;    }    else {	/* ntimes=ceiling_div(end_loc - st_loc + 1, coll_bufsize)*/	ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);    }    MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX, fd->comm);     if (ntimes) read_buf = (char *) ADIOI_Malloc(coll_bufsize);    curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));     /* its use is explained below. calloc initializes to 0. */    count = (int *) ADIOI_Malloc(nprocs * sizeof(int));    /* to store count of how many off-len pairs per proc are satisfied       in an iteration. */    partial_send = (int *) ADIOI_Calloc(nprocs, sizeof(int));    /* if only a portion of the last off-len pair is sent to a process        in a particular iteration, the length sent is stored here.       calloc initializes to 0. */    send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));    /* total size of data to be sent to each proc. in an iteration */    recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));    /* total size of data to be recd. from each proc. in an iteration.       Of size nprocs so that I can use MPI_Alltoall later. */    recd_from_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));    /* amount of data recd. so far from each proc. Used in       ADIOI_Fill_user_buffer. initialized to 0 here. */    start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));    /* used to store the starting value of curr_offlen_ptr[i] in        this iteration */    ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);    if (!buftype_is_contig) {	ADIOI_Flatten_datatype(datatype);	flat_buf = ADIOI_Flatlist;        while (flat_buf->type != datatype) flat_buf = flat_buf->next;    }    MPI_Type_extent(datatype, &buftype_extent);    done = 0;    off = st_loc;    for_curr_iter = for_next_iter = 0;    MPI_Comm_rank(fd->comm, &rank);    for (m=0; m<ntimes; m++) {       /* read buf of size coll_bufsize (or less) */       /* go through all others_req and check if any are satisfied          by the current read */       /* since MPI guarantees that displacements in filetypes are in           monotonically nondecreasing order, I can maintain a pointer	  (curr_offlen_ptr) to           current off-len pair for each process in others_req and scan          further only from there. There is still a problem of filetypes          such as:  (1, 2, 3 are not process nos. They are just numbers for          three chunks of data, specified by a filetype.)                   1  -------!--                   2    -----!----                   3       --!-----          where ! indicates where the current read_size limitation cuts           through the filetype.  I resolve this by reading up to !, but          filling the communication buffer only for 1. I copy the portion          left over for 2 into a tmp_buf for use in the next	  iteration. i.e., 2 and 3 will be satisfied in the next	  iteration. This simplifies filling in the user's buf at the	  other end, as only one off-len pair with incomplete data	  will be sent. I also don't need to send the individual	  offsets and lens along with the data, as the data is being	  sent in a particular order. */           /* off = start offset in the file for the data actually read in                    this iteration              size = size of data read corresponding to off             real_off = off minus whatever data was retained in memory from                  previous iteration for cases like 2, 3 illustrated above             real_size = size plus the extra corresponding to real_off             req_off = off in file for a particular contiguous request                        minus what was satisfied in previous iteration             req_size = size corresponding to req_off */	size = (int) (ADIOI_MIN(coll_bufsize, end_loc-st_loc+1-done)); 	real_off = off - for_curr_iter;	real_size = size + for_curr_iter;	for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;	for_next_iter = 0;	for (i=0; i<nprocs; i++) {	    /* FPRINTF(stderr, "rank %d, i %d, others_count %d\n", rank, i, others_req[i].count); */	    if (others_req[i].count) {		start_pos[i] = curr_offlen_ptr[i];		for (j=curr_offlen_ptr[i]; j<others_req[i].count;		     j++) {		    if (partial_send[i]) {			/* this request may have been partially			   satisfied in the previous iteration. */			req_off = others_req[i].offsets[j] +			    partial_send[i];                         req_len = others_req[i].lens[j] -			    partial_send[i];			partial_send[i] = 0;			/* modify the off-len pair to reflect this change */			others_req[i].offsets[j] = req_off;			others_req[i].lens[j] = req_len;		    }		    else {			req_off = others_req[i].offsets[j];                        req_len = others_req[i].lens[j];		    }		    if (req_off < real_off + real_size) {			count[i]++;			MPI_Address(read_buf+req_off-real_off,                                &(others_req[i].mem_ptrs[j]));			send_size[i] += (int)(ADIOI_MIN(real_off + (ADIO_Offset)real_size - 						  req_off, req_len));			if (real_off+real_size-req_off < req_len) {			    partial_send[i] = (int) (real_off+real_size-						     req_off);			    if ((j+1 < others_req[i].count) &&                                  (others_req[i].offsets[j+1] <                                      real_off+real_size)) { 				/* this is the case illustrated in the				   figure above. */				for_next_iter = (int) (ADIOI_MAX(for_next_iter,					  real_off + real_size -                                              others_req[i].offsets[j+1])); 				/* max because it must cover requests 				   from different processes */			    }			    break;			}		    }		    else break;		}		curr_offlen_ptr[i] = j;	    }	}	flag = 0;	for (i=0; i<nprocs; i++)	    if (count[i]) flag = 1;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -