⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 merge.c

📁 用于2维的射线追踪
💻 C
字号:
/* * Ray2mesh : software for geophysicists. * Compute various scores attached to the mesh cells, based on geometric   information that rays bring when the traverse the cell. * * Copyright (C) 2003, St閜hane Genaud and Marc Grunberg * * This tool is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the Free * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */#include <sys/types.h>#include <sys/stat.h>#include <unistd.h>#include <sys/mman.h>#ifdef USE_MPI#include <mpi.h>#endif#ifdef USE_ZLIB#include <zlib.h>#endif#include <stdio.h>#include <mesh/mesh.h>#include <mesh/layer.h>#include "ray2mesh.h"#include "r2mfile.h"#include "merge.h"#include "util.h"		/* for get_date_stamp() */#include "memusage.h"#define ROOT 0#define MERGE_DEBUG#ifdef USE_MPI/* check if all transmission occured flawlessly *//* if not, exit                                 */void check_transmission_error(MPI_Status * status_tab, int nb, char *str){	int             i;	int             test;	for (i = 0; i < nb; i++) {		MPI_Test_cancelled(status_tab + i, &test);		if (test == 1) {			fprintf(stderr, "check_transmission_error : [%d] %s", i, str);			MPI_Abort(MPI_COMM_WORLD, 1);			exit(1);		}	}}/* * ************************************************************************* * \brief prepare, send, recv, import in mesh cell data from other processes. * * Each process is assigned a domain which is a vertical mesh slice. * Prepare data cell buffers. This buffer is text formated.   During this process, we scan all domains , each cell is dumped   in the text buffer, then the cell is freed.   1- prepare  buffer and clean  cell_info data in the same time   2- send to other procs the buffer's size   3- recv buffer's size from other procs   4- send the buffer itself to other procs   5- import the buffer received   return: a set of timers:   timers[0] : time to write domain temp files   timers[1] : time to complete receive of all domain sizes after start_time+timers[0]   timers[2] : time to complete send of all domain sizes after start_time+timers[0]   timers[3] : time to emit Irecv for all domains   timers[4] : time to emit Isend for all domains   timers[5] : time to complete Send after Isend emitted   timers[6] : time to complete Recv after Isend emitted   timers[7] : time between start_time_sendrecv and Send+Recv are complete   timers[8] : time after Barrier() -> return from function   timers[9] : time spent for I/O (load_file_to_memory + import_cell_buffer);  ******************************************************************* */double         *send_recv_cell_info(MPI_Comm com, struct cell_info_t **** cell_info,		    struct mesh_t * mesh, char *tmpdir){	MPI_Request    *req_send_tab;	/* array of handler (isend) */	MPI_Request    *req_recv_tab;	/* array of handler (irecv) */	MPI_Status     *status_recv_tab;	/* array of status (irecv)  */	MPI_Status     *status_send_tab;	/* array of status (isend)  */	double         *timers;	double          start_time;	double          start_time_sendrecv;	double          start_time_io;	char           *date_stamp;	int             rank, nbprocs;	int             ireq;	int             tag_size = 1;	/* mpi msg id */	int             tag_buff = 2;	/* mpi msg id */	int             dom;	/* domain ID */	int             buff_size;	/* size txt cell_info buffer */	struct stat     filestat;	struct domain_info_t {	/* domain information */		char            file[256];		char           *buffmap;	/* mmap file buffer */		char           *buff;	/* domain cell info buffer */		int             size_send;	/* buff size */		int             size_recv;	/* buff size */	}              *dominfo;#ifdef USE_ZLIB	char           *zbuf;	int             zbuf_size;#endif	timers = (double *) calloc (10, sizeof(double));   assert(timers);	start_time = MPI_Wtime();	MPI_Comm_rank(com, &rank);	MPI_Comm_size(com, &nbprocs);	fprintf(stdout, "Merge started with proc %d/%d\n", rank, nbprocs);	fflush(stdout);	/* Alloc */	req_send_tab =		(MPI_Request *) calloc(nbprocs - 1, sizeof(MPI_Request));	if (req_send_tab == NULL) {		MPI_Abort(MPI_COMM_WORLD, 1);		exit(1);	}	req_recv_tab =		(MPI_Request *) calloc(nbprocs - 1, sizeof(MPI_Request));	if (req_recv_tab == NULL) {		MPI_Abort(MPI_COMM_WORLD, 1);		exit(1);	}	status_send_tab =		(MPI_Status *) calloc(nbprocs - 1, sizeof(MPI_Status));	if (status_send_tab == NULL) {		MPI_Abort(MPI_COMM_WORLD, 1);		exit(1);	}	status_recv_tab =		(MPI_Status *) calloc(nbprocs - 1, sizeof(MPI_Status));	if (status_recv_tab == NULL) {		MPI_Abort(MPI_COMM_WORLD, 1);		exit(1);	}	dominfo = (struct domain_info_t *)		malloc(nbprocs * sizeof(struct domain_info_t));	if (dominfo == NULL) {		MPI_Abort(MPI_COMM_WORLD, 1);		exit(1);	}	/*****************************************************************************/	/*	 * create data for each domain, save them to disk, and free them from	 * memory	 */	/* Send buffer size                                                          */	/*****************************************************************************/	ireq = 0;	for (dom = 0; dom < nbprocs; dom++) {		dominfo[dom].buffmap = NULL;		dominfo[dom].buff = NULL;		dominfo[dom].size_send = 0;		dominfo[dom].size_recv = 0;		if (dom == rank) {			strcpy(dominfo[dom].file, "");			continue;		}		/* create file with domain cell info and clean the */		/* corresponding cell from memory */#ifdef USE_ZLIB		snprintf(dominfo[dom].file, 256, "%s/p%d-domain-%d.dom.gz", tmpdir, rank,			 dom);#else		snprintf(dominfo[dom].file, 256, "%s/p%d-domain-%d.dom", tmpdir, rank,			 dom);#endif		make_domain_info_file(dominfo[dom].file, cell_info, mesh, dom,				      nbprocs, rank);		stat(dominfo[dom].file, &filestat);		dominfo[dom].size_send = filestat.st_size;#ifdef MERGE_DEBUG		date_stamp = get_date_stamp();		fprintf(stdout,		"*** [process %d] sends size(%d) to [process %d] *** (%s)\n",			rank, dominfo[dom].size_send, dom, date_stamp);		fflush(stdout);		free(date_stamp);#endif		/* send the buffer size to the procs 'dom' which is dedicated */		/* to work on the 'dom' domain */		MPI_Isend(&(dominfo[dom].size_send), 1, MPI_INT, dom, tag_size, com, req_send_tab + ireq);		ireq++;	}   timers[0] = MPI_Wtime() - start_time;	/***************************************/	/* recv buffer's size from other procs */	/***************************************/	ireq = 0;	for (dom = 0; dom < nbprocs; dom++) {		if (dom == rank)			continue;		MPI_Irecv(&(dominfo[dom].size_recv), 1, MPI_INT, dom, tag_size,			  com, req_recv_tab + ireq);		ireq++;	}	/***************************/	/* wait to recv all message */	/***************************/	MPI_Waitall(ireq, req_recv_tab, status_recv_tab);	check_transmission_error(status_recv_tab, ireq, "MPI_Irecv (size)\n");	/* all domain sizes received */	timers[1] = MPI_Wtime() - timers[0] - start_time;#ifdef MERGE_DEBUG	date_stamp = get_date_stamp();	fprintf(stdout,		"*** [process %d] MPI_Irecv domain size from %d other processes ***(%s)\n",		rank, ireq, date_stamp);	fflush(stdout);	free(date_stamp);#endif	/********************************************/	/* wait to be sure that all message is sent */	/********************************************/	MPI_Waitall(ireq, req_send_tab, status_send_tab);	check_transmission_error(status_send_tab, ireq, "MPI_Isend (size)\n");	/* all domain sizes sent */	timers[2] = MPI_Wtime() - timers[1] - start_time;#ifdef MERGE_DEBUG	date_stamp = get_date_stamp();	fprintf(stdout,		"*** [process %d] MPI_Isend domain sizes to all other processes (%d) *** (%s)\n",		rank, ireq, date_stamp);	fflush(stdout);	free(date_stamp);#endif	//MPI_Barrier(com);	/* reset to zero */	/*	 * memset(req_recv_tab, 0, sizeof(MPI_Request)*(nbprocs-1));	 * memset(req_send_tab, 0, sizeof(MPI_Request)*(nbprocs-1));	 */	/* mem info */	fprintf(stdout,		"*** [process %d] starting data exange with MEM=%.2f\n",		rank, get_mem_usage());	fflush(stdout);	/*****************************************************************/	/* recv  buffer must be in a recv wait state before sending data */   /* when buffer are >  40K                                        */	/*****************************************************************/	start_time_sendrecv = MPI_Wtime();	ireq = 0;	for (dom = 0; dom < nbprocs; dom++) {		if (dom == rank)			continue;		/* allocation to recv the buffer */		dominfo[dom].buff = (char *)			malloc(sizeof(char) * dominfo[dom].size_recv);		if (dominfo[dom].buff == NULL) {			fprintf(stderr, "%s : could not allocate %d bytes. Exiting.\n", PACKAGE, dominfo[dom].size_recv);			MPI_Abort(MPI_COMM_WORLD, 1);		}#ifdef MERGE_DEBUG		date_stamp = get_date_stamp();		fprintf(stdout, "*** [process %d] waits buffmap from [process %d] (size = %d) (%s)\n",			rank, dom, dominfo[dom].size_recv, date_stamp);		fflush(stdout);		free(date_stamp);#endif		/* start Irecv data */		MPI_Irecv(dominfo[dom].buff, dominfo[dom].size_recv,			  MPI_UNSIGNED_CHAR, dom, tag_buff, com,			  req_recv_tab + ireq);		ireq++;	}	timers[3] = MPI_Wtime() - start_time_sendrecv;	/* mem info */	fprintf(stdout,		"*** [process %d] all data buffers space to be received are allocated, MEM=%.2f\n",		rank, get_mem_usage());	fflush(stdout);	/****************/	/* send  buffer */	/****************/	ireq = 0;	for (dom = 0; dom < nbprocs; dom++) {		if (dom == rank)			continue;            start_time_io =  MPI_Wtime();		/* load domain cell info to mem and send it */		if ((dominfo[dom].buffmap =		     load_file_to_memory(dominfo[dom].file, &buff_size)) == NULL) {			fprintf(stderr,				"%s : could not load %s to memory (%d bytes). Check free space on disk. Exiting.\n",				PACKAGE, dominfo[dom].file, buff_size);			MPI_Abort(MPI_COMM_WORLD, 1);		}		timers[9] += MPI_Wtime() - start_time_io;#ifdef MERGE_DEBUG		date_stamp = get_date_stamp();		fprintf(stdout,			"*** [process %d] sends buffmap to [process %d] (size = %d/%d) *** (%s)\n",		  rank, dom, buff_size, dominfo[dom].size_send, date_stamp);		fflush(stdout);		free(date_stamp);#endif		MPI_Isend(dominfo[dom].buffmap, dominfo[dom].size_send,			  MPI_UNSIGNED_CHAR, dom, tag_buff, com,			  req_send_tab + ireq);		ireq++;	}	/* all Isend have been emitted */	timers[4] = MPI_Wtime() - timers[3] - start_time_sendrecv;#ifdef MERGE_DEBUG	date_stamp = get_date_stamp();	fprintf(stdout, "*** [process %d] sent all buffmap to all other processes (%d) *** (%s)\n",		rank, ireq, date_stamp);	fflush(stdout);	free(date_stamp);#endif	/* mem info */	fprintf(stdout, "*** [process %d] all data buffers space to be send are allocated, MEM=%.2f\n",		rank, get_mem_usage());	fflush(stdout);	/****************/	/* Manage data */	/***************/	{		int             send_status = 1;		int             recv_status = 1;		int             send_proc_id, recv_proc_id;		MPI_Status      sstatus, rstatus;		while (send_status || recv_status) {			if (send_status) {				/* if data are sent -> clean them */				MPI_Waitany(ireq, req_send_tab, &send_proc_id, &sstatus);				if (send_proc_id == MPI_UNDEFINED) {					send_status = 0;					/* from Isend to sending complete */					timers[5] = MPI_Wtime() - timers[4] - timers[3] - start_time_sendrecv;				} else {					if (send_proc_id >= rank) {						send_proc_id++;					}#ifdef MERGE_DEBUG					date_stamp = get_date_stamp();					fprintf(stdout,						"*** [process %d] MPI_Isend (buffer)   to [process %d] OK *** (%s)\n",					    rank, send_proc_id, date_stamp);					fflush(stdout);					free(date_stamp);#endif					munmap(dominfo[send_proc_id].buffmap, dominfo[send_proc_id].size_send);					unlink(dominfo[send_proc_id].file);					dominfo[send_proc_id].buffmap = NULL;				}			}			if (recv_status) {				/* if data are received -> process them */				MPI_Waitany(ireq, req_recv_tab, &recv_proc_id, &rstatus);				if (recv_proc_id == MPI_UNDEFINED) {					recv_status = 0;					/* from Isend to receiving complete */					timers[6] = MPI_Wtime() - timers[4] - timers[3] - start_time_sendrecv;				} else {					if (recv_proc_id >= rank) {						recv_proc_id++;					}#ifdef MERGE_DEBUG					date_stamp = get_date_stamp();					fprintf(stdout, "*** [process %d] MPI_Irecv (buffer) from [process %d] OK *** (%s)\n",					    rank, recv_proc_id, date_stamp);					fflush(stdout);					free(date_stamp);#endif					start_time_io= MPI_Wtime();#ifdef USE_ZLIB										zbuf = dominfo[recv_proc_id].buff;					zbuf_size = dominfo[recv_proc_id].size_recv;					dominfo[recv_proc_id].buff = decompress_cell_data_gzbuffer(						 cell_info, zbuf, zbuf_size,												   &(dominfo[recv_proc_id].size_recv), rank);					free(zbuf);#endif					import_cell_buffer(cell_info,							   dominfo[recv_proc_id].buff, dominfo[recv_proc_id].size_recv,							   -1, -1,	/* import all layers */							rank, recv_proc_id);					free(dominfo[recv_proc_id].buff);					timers[9] += MPI_Wtime() - start_time_io;					fprintf(stdout,						"*** [process %d] after buffer[%d] importation, MEM=%.2f\n",					   rank, recv_proc_id, get_mem_usage());					fflush(stdout);				}			}		}	}	date_stamp = get_date_stamp();	fprintf(stdout, "*** [process %d] DATA EXCHANGE COMPLETED, MEM=%.2f (%s)\n",		rank, get_mem_usage(), date_stamp);	fflush(stdout);	free(date_stamp);	/******************/	/* cleaning stuff */	/******************/	free(req_send_tab);	free(status_send_tab);	free(req_recv_tab);	free(status_recv_tab);	free(dominfo);	/* from Isend to send/receive complete */	timers[7] = (MPI_Wtime() - start_time_sendrecv);	MPI_Barrier(com);	/* full send_receive operation */	timers[8] = (MPI_Wtime() - start_time);	return (timers);}#endif

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -