⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ad_write_coll.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 3 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//*  * *   Copyright (C) 1997 University of Chicago.  *   See COPYRIGHT notice in top-level directory. */#include "adio.h"#include "adio_extern.h"/* prototypes of functions used for collective writes only. */static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype                         datatype, int nprocs, int myrank,			 ADIOI_Access                         *others_req, ADIO_Offset *offset_list,                         int *len_list, int contig_access_count, ADIO_Offset                         min_st_offset, ADIO_Offset fd_size,                         ADIO_Offset *fd_start, ADIO_Offset *fd_end,                         int *buf_idx, int *error_code);static void ADIOI_W_Exchange_data(ADIO_File fd, void *buf, char *write_buf,                         ADIOI_Flatlist_node *flat_buf, ADIO_Offset                          *offset_list, int *len_list, int *send_size,                          int *recv_size, ADIO_Offset off, int size,                         int *count, int *start_pos, int *partial_recv,                          int *sent_to_proc, int nprocs,                          int myrank, int                         buftype_is_contig, int contig_access_count,                         ADIO_Offset min_st_offset, ADIO_Offset fd_size,                         ADIO_Offset *fd_start, ADIO_Offset *fd_end,                          ADIOI_Access *others_req,                          int *send_buf_idx, int *curr_to_proc,                         int *done_to_proc, int *hole, int iter,                          MPI_Aint buftype_extent, int *buf_idx, int *error_code);static void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node                           *flat_buf, char **send_buf, ADIO_Offset                            *offset_list, int *len_list, int *send_size,                            MPI_Request *requests, int *sent_to_proc,                            int nprocs, int myrank,                            int contig_access_count, ADIO_Offset                           min_st_offset, ADIO_Offset fd_size,                           ADIO_Offset *fd_start, ADIO_Offset *fd_end,                            int *send_buf_idx, int *curr_to_proc,                            int *done_to_proc, int iter,                            MPI_Aint buftype_extent);static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,                       ADIO_Offset *srt_off, int *srt_len, int *start_pos,                      int nprocs, int nprocs_recv, int total_elements);void ADIOI_GEN_WriteStridedColl(ADIO_File fd, void *buf, int count,                       MPI_Datatype datatype, int file_ptr_type,                       ADIO_Offset offset, ADIO_Status *status, int                       *error_code){/* Uses a generalized version of the extended two-phase method described   in "An Extended Two-Phase Method for Accessing Sections of    Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,   Scientific Programming, (5)4:301--317, Winter 1996.    http://www.mcs.anl.gov/home/thakur/ext2ph.ps */    ADIOI_Access *my_req;     /* array of nprocs access structures, one for each other process in       whose file domain this process's request lies */        ADIOI_Access *others_req;    /* array of nprocs access structures, one for each other process       whose request lies in this process's file domain. */    int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;    int contig_access_count=0, interleave_count = 0, buftype_is_contig;    int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;    ADIO_Offset orig_fp, start_offset, end_offset, fd_size, min_st_offset, off;    ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,	*fd_end = NULL, *end_offsets = NULL;    int *buf_idx = NULL, *len_list = NULL;    int old_error, tmp_error;    MPI_Comm_size(fd->comm, &nprocs);    MPI_Comm_rank(fd->comm, &myrank);/* the number of processes that actually perform I/O, nprocs_for_coll, * is stored in the hints off the ADIO_File structure */    nprocs_for_coll = fd->hints->cb_nodes;    orig_fp = fd->fp_ind;    /* only check for interleaving if cb_write isn't disabled */    if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {	/* For this process's request, calculate the list of offsets and	   lengths in the file and determine the start and end offsets. */	/* Note: end_offset points to the last byte-offset that will be accessed.	   e.g., if start_offset=0 and 100 bytes to be read, end_offset=99*/	ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,			      &offset_list, &len_list, &start_offset,			      &end_offset, &contig_access_count); 	/* each process communicates its start and end offsets to other 	   processes. The result is an array each of start and end offsets stored	   in order of process rank. */     	st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));	end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));	MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,		      ADIO_OFFSET, fd->comm);	MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,		      ADIO_OFFSET, fd->comm);	/* are the accesses of different processes interleaved? */	for (i=1; i<nprocs; i++)	    if ((st_offsets[i] < end_offsets[i-1]) &&                 (st_offsets[i] <= end_offsets[i]))                interleave_count++;	/* This is a rudimentary check for interleaving, but should suffice	   for the moment. */    }    ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);    if (fd->hints->cb_write == ADIOI_HINT_DISABLE ||	(!interleave_count && (fd->hints->cb_write == ADIOI_HINT_AUTO)))    {	/* use independent accesses */	if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {	    ADIOI_Free(offset_list);	    ADIOI_Free(len_list);	    ADIOI_Free(st_offsets);	    ADIOI_Free(end_offsets);	}	fd->fp_ind = orig_fp;        ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);        if (buftype_is_contig && filetype_is_contig) {            if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {                off = fd->disp + (fd->etype_size) * offset;                ADIO_WriteContig(fd, buf, count, datatype,				 ADIO_EXPLICIT_OFFSET,				 off, status, error_code);            }            else ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,				  0, status, error_code);        }	else ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,			       offset, status, error_code);	return;    }/* Divide the I/O workload among "nprocs_for_coll" processes. This is   done by (logically) dividing the file into file domains (FDs); each   process may directly access only its own file domain. */    ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,			    nprocs_for_coll, &min_st_offset,			    &fd_start, &fd_end, &fd_size);   /* calculate what portions of the access requests of this process are   located in what file domains */    ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,		      min_st_offset, fd_start, fd_end, fd_size,		      nprocs, &count_my_req_procs, 		      &count_my_req_per_proc, &my_req,		      &buf_idx); /* based on everyone's my_req, calculate what requests of other   processes lie in this process's file domain.   count_others_req_procs = number of processes whose requests lie in   this process's file domain (including this process itself)    count_others_req_per_proc[i] indicates how many separate contiguous   requests of proc. i lie in this process's file domain. */    ADIOI_Calc_others_req(fd, count_my_req_procs, 			  count_my_req_per_proc, my_req, 			  nprocs, myrank,			  &count_others_req_procs, &others_req);         ADIOI_Free(count_my_req_per_proc);    for (i=0; i < nprocs; i++) {	if (my_req[i].count) {	    ADIOI_Free(my_req[i].offsets);	    ADIOI_Free(my_req[i].lens);	}    }    ADIOI_Free(my_req);/* exchange data and write in sizes of no more than coll_bufsize. */    ADIOI_Exch_and_write(fd, buf, datatype, nprocs, myrank,                        others_req, offset_list,			len_list, contig_access_count, min_st_offset,			fd_size, fd_start, fd_end, buf_idx, error_code);    /* If this collective write is followed by an independent write,     * it's possible to have those subsequent writes on other processes     * race ahead and sneak in before the read-modify-write completes.     * We carry out a collective communication at the end here so no one     * can start independent i/o before collective I/O completes.      *     * need to do some gymnastics with the error codes so that if something     * went wrong, all processes report error, but if a process has a more     * specific error code, we can still have that process report the     * additional information */    old_error = *error_code;    if (*error_code != MPI_SUCCESS) *error_code = MPI_ERR_IO;     /* optimization: if only one process performing i/o, we can perform     * a less-expensive Bcast  */#ifdef ADIOI_MPE_LOGGING    MPE_Log_event( ADIOI_MPE_postwrite_a, 0, NULL );#endif    if (fd->hints->cb_nodes == 1) 	    MPI_Bcast(error_code, 1, MPI_INT, 			    fd->hints->ranklist[0], fd->comm);    else {	    tmp_error = *error_code;	    MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT, 			    MPI_MAX, fd->comm);    }#ifdef ADIOI_MPE_LOGGING    MPE_Log_event( ADIOI_MPE_postwrite_b, 0, NULL );#endif    if ( (old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO) )	    *error_code = old_error;    if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);/* free all memory allocated for collective I/O */    for (i=0; i<nprocs; i++) {	if (others_req[i].count) {	    ADIOI_Free(others_req[i].offsets);	    ADIOI_Free(others_req[i].lens);	    ADIOI_Free(others_req[i].mem_ptrs);	}    }    ADIOI_Free(others_req);    ADIOI_Free(buf_idx);    ADIOI_Free(offset_list);    ADIOI_Free(len_list);    ADIOI_Free(st_offsets);    ADIOI_Free(end_offsets);    ADIOI_Free(fd_start);    ADIOI_Free(fd_end);#ifdef HAVE_STATUS_SET_BYTES    if (status) {      int bufsize, size;      /* Don't set status if it isn't needed */      MPI_Type_size(datatype, &size);      bufsize = size * count;      MPIR_Status_set_bytes(status, datatype, bufsize);    }/* This is a temporary way of filling in status. The right way is to    keep track of how much data was actually written during collective I/O. */#endif    fd->fp_sys_posn = -1;   /* set it to null. */}/* If successful, error_code is set to MPI_SUCCESS.  Otherwise an error * code is created and returned in error_code. */static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype				 datatype, int nprocs, 				 int myrank,				 ADIOI_Access				 *others_req, ADIO_Offset *offset_list,				 int *len_list, int contig_access_count,				 ADIO_Offset				 min_st_offset, ADIO_Offset fd_size,				 ADIO_Offset *fd_start, ADIO_Offset *fd_end,				 int *buf_idx, int *error_code){/* Send data to appropriate processes and write in sizes of no more   than coll_bufsize.       The idea is to reduce the amount of extra memory required for   collective I/O. If all data were written all at once, which is much   easier, it would require temp space more than the size of user_buf,   which is often unacceptable. For example, to write a distributed   array to a file, where each local array is 8Mbytes, requiring   at least another 8Mbytes of temp space is unacceptable. */    int hole, i, j, m, size=0, ntimes, max_ntimes, buftype_is_contig;    ADIO_Offset st_loc=-1, end_loc=-1, off, done, req_off;    char *write_buf=NULL;    int *curr_offlen_ptr, *count, *send_size, req_len, *recv_size;    int *partial_recv, *sent_to_proc, *start_pos, flag;    int *send_buf_idx, *curr_to_proc, *done_to_proc;    MPI_Status status;    ADIOI_Flatlist_node *flat_buf=NULL;    MPI_Aint buftype_extent;    int info_flag, coll_bufsize;    char *value;    static char myname[] = "ADIOI_EXCH_AND_WRITE";    *error_code = MPI_SUCCESS;  /* changed below if error */    /* only I/O errors are currently reported *//* calculate the number of writes of size coll_bufsize   to be done by each process and the max among all processes.   That gives the no. of communication phases as well. */    value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL+1)*sizeof(char));    MPI_Info_get(fd->info, "cb_buffer_size", MPI_MAX_INFO_VAL, value,                  &info_flag);    coll_bufsize = atoi(value);    ADIOI_Free(value);    for (i=0; i < nprocs; i++) {	if (others_req[i].count) {	    st_loc = others_req[i].offsets[0];	    end_loc = others_req[i].offsets[0];	    break;	}    }    for (i=0; i < nprocs; i++)	for (j=0; j < others_req[i].count; j++) {	    st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);	    end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]				       + others_req[i].lens[j] - 1));	}/* ntimes=ceiling_div(end_loc - st_loc + 1, coll_bufsize)*/    ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);    if ((st_loc==-1) && (end_loc==-1)) {	ntimes = 0; /* this process does no writing. */    }    MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX,		  fd->comm);     if (ntimes) write_buf = (char *) ADIOI_Malloc(coll_bufsize);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -