📄 merge.c
字号:
/* * Ray2mesh : software for geophysicists. * Compute various scores attached to the mesh cells, based on geometric information that rays bring when the traverse the cell. * * Copyright (C) 2003, St閜hane Genaud and Marc Grunberg * * This tool is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the Free * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */#include <sys/types.h>#include <sys/stat.h>#include <unistd.h>#include <sys/mman.h>#ifdef USE_MPI#include <mpi.h>#endif#ifdef USE_ZLIB#include <zlib.h>#endif#include <stdio.h>#include <mesh/mesh.h>#include <mesh/layer.h>#include "ray2mesh.h"#include "r2mfile.h"#include "merge.h"#include "util.h" /* for get_date_stamp() */#include "memusage.h"#define ROOT 0#define MERGE_DEBUG#ifdef USE_MPI/* check if all transmission occured flawlessly *//* if not, exit */void check_transmission_error(MPI_Status * status_tab, int nb, char *str){ int i; int test; for (i = 0; i < nb; i++) { MPI_Test_cancelled(status_tab + i, &test); if (test == 1) { fprintf(stderr, "check_transmission_error : [%d] %s", i, str); MPI_Abort(MPI_COMM_WORLD, 1); exit(1); } }}/* * ************************************************************************* * \brief prepare, send, recv, import in mesh cell data from other processes. * * Each process is assigned a domain which is a vertical mesh slice. * Prepare data cell buffers. This buffer is text formated. During this process, we scan all domains , each cell is dumped in the text buffer, then the cell is freed. 1- prepare buffer and clean cell_info data in the same time 2- send to other procs the buffer's size 3- recv buffer's size from other procs 4- send the buffer itself to other procs 5- import the buffer received return: a set of timers: timers[0] : time to write domain temp files timers[1] : time to complete receive of all domain sizes after start_time+timers[0] timers[2] : time to complete send of all domain sizes after start_time+timers[0] timers[3] : time to emit Irecv for all domains timers[4] : time to emit Isend for all domains timers[5] : time to complete Send after Isend emitted timers[6] : time to complete Recv after Isend emitted timers[7] : time between start_time_sendrecv and Send+Recv are complete timers[8] : time after Barrier() -> return from function timers[9] : time spent for I/O (load_file_to_memory + import_cell_buffer); ******************************************************************* */double *send_recv_cell_info(MPI_Comm com, struct cell_info_t **** cell_info, struct mesh_t * mesh, char *tmpdir){ MPI_Request *req_send_tab; /* array of handler (isend) */ MPI_Request *req_recv_tab; /* array of handler (irecv) */ MPI_Status *status_recv_tab; /* array of status (irecv) */ MPI_Status *status_send_tab; /* array of status (isend) */ double *timers; double start_time; double start_time_sendrecv; double start_time_io; char *date_stamp; int rank, nbprocs; int ireq; int tag_size = 1; /* mpi msg id */ int tag_buff = 2; /* mpi msg id */ int dom; /* domain ID */ int buff_size; /* size txt cell_info buffer */ struct stat filestat; struct domain_info_t { /* domain information */ char file[256]; char *buffmap; /* mmap file buffer */ char *buff; /* domain cell info buffer */ int size_send; /* buff size */ int size_recv; /* buff size */ } *dominfo;#ifdef USE_ZLIB char *zbuf; int zbuf_size;#endif timers = (double *) calloc (10, sizeof(double)); assert(timers); start_time = MPI_Wtime(); MPI_Comm_rank(com, &rank); MPI_Comm_size(com, &nbprocs); fprintf(stdout, "Merge started with proc %d/%d\n", rank, nbprocs); fflush(stdout); /* Alloc */ req_send_tab = (MPI_Request *) calloc(nbprocs - 1, sizeof(MPI_Request)); if (req_send_tab == NULL) { MPI_Abort(MPI_COMM_WORLD, 1); exit(1); } req_recv_tab = (MPI_Request *) calloc(nbprocs - 1, sizeof(MPI_Request)); if (req_recv_tab == NULL) { MPI_Abort(MPI_COMM_WORLD, 1); exit(1); } status_send_tab = (MPI_Status *) calloc(nbprocs - 1, sizeof(MPI_Status)); if (status_send_tab == NULL) { MPI_Abort(MPI_COMM_WORLD, 1); exit(1); } status_recv_tab = (MPI_Status *) calloc(nbprocs - 1, sizeof(MPI_Status)); if (status_recv_tab == NULL) { MPI_Abort(MPI_COMM_WORLD, 1); exit(1); } dominfo = (struct domain_info_t *) malloc(nbprocs * sizeof(struct domain_info_t)); if (dominfo == NULL) { MPI_Abort(MPI_COMM_WORLD, 1); exit(1); } /*****************************************************************************/ /* * create data for each domain, save them to disk, and free them from * memory */ /* Send buffer size */ /*****************************************************************************/ ireq = 0; for (dom = 0; dom < nbprocs; dom++) { dominfo[dom].buffmap = NULL; dominfo[dom].buff = NULL; dominfo[dom].size_send = 0; dominfo[dom].size_recv = 0; if (dom == rank) { strcpy(dominfo[dom].file, ""); continue; } /* create file with domain cell info and clean the */ /* corresponding cell from memory */#ifdef USE_ZLIB snprintf(dominfo[dom].file, 256, "%s/p%d-domain-%d.dom.gz", tmpdir, rank, dom);#else snprintf(dominfo[dom].file, 256, "%s/p%d-domain-%d.dom", tmpdir, rank, dom);#endif make_domain_info_file(dominfo[dom].file, cell_info, mesh, dom, nbprocs, rank); stat(dominfo[dom].file, &filestat); dominfo[dom].size_send = filestat.st_size;#ifdef MERGE_DEBUG date_stamp = get_date_stamp(); fprintf(stdout, "*** [process %d] sends size(%d) to [process %d] *** (%s)\n", rank, dominfo[dom].size_send, dom, date_stamp); fflush(stdout); free(date_stamp);#endif /* send the buffer size to the procs 'dom' which is dedicated */ /* to work on the 'dom' domain */ MPI_Isend(&(dominfo[dom].size_send), 1, MPI_INT, dom, tag_size, com, req_send_tab + ireq); ireq++; } timers[0] = MPI_Wtime() - start_time; /***************************************/ /* recv buffer's size from other procs */ /***************************************/ ireq = 0; for (dom = 0; dom < nbprocs; dom++) { if (dom == rank) continue; MPI_Irecv(&(dominfo[dom].size_recv), 1, MPI_INT, dom, tag_size, com, req_recv_tab + ireq); ireq++; } /***************************/ /* wait to recv all message */ /***************************/ MPI_Waitall(ireq, req_recv_tab, status_recv_tab); check_transmission_error(status_recv_tab, ireq, "MPI_Irecv (size)\n"); /* all domain sizes received */ timers[1] = MPI_Wtime() - timers[0] - start_time;#ifdef MERGE_DEBUG date_stamp = get_date_stamp(); fprintf(stdout, "*** [process %d] MPI_Irecv domain size from %d other processes ***(%s)\n", rank, ireq, date_stamp); fflush(stdout); free(date_stamp);#endif /********************************************/ /* wait to be sure that all message is sent */ /********************************************/ MPI_Waitall(ireq, req_send_tab, status_send_tab); check_transmission_error(status_send_tab, ireq, "MPI_Isend (size)\n"); /* all domain sizes sent */ timers[2] = MPI_Wtime() - timers[1] - start_time;#ifdef MERGE_DEBUG date_stamp = get_date_stamp(); fprintf(stdout, "*** [process %d] MPI_Isend domain sizes to all other processes (%d) *** (%s)\n", rank, ireq, date_stamp); fflush(stdout); free(date_stamp);#endif //MPI_Barrier(com); /* reset to zero */ /* * memset(req_recv_tab, 0, sizeof(MPI_Request)*(nbprocs-1)); * memset(req_send_tab, 0, sizeof(MPI_Request)*(nbprocs-1)); */ /* mem info */ fprintf(stdout, "*** [process %d] starting data exange with MEM=%.2f\n", rank, get_mem_usage()); fflush(stdout); /*****************************************************************/ /* recv buffer must be in a recv wait state before sending data */ /* when buffer are > 40K */ /*****************************************************************/ start_time_sendrecv = MPI_Wtime(); ireq = 0; for (dom = 0; dom < nbprocs; dom++) { if (dom == rank) continue; /* allocation to recv the buffer */ dominfo[dom].buff = (char *) malloc(sizeof(char) * dominfo[dom].size_recv); if (dominfo[dom].buff == NULL) { fprintf(stderr, "%s : could not allocate %d bytes. Exiting.\n", PACKAGE, dominfo[dom].size_recv); MPI_Abort(MPI_COMM_WORLD, 1); }#ifdef MERGE_DEBUG date_stamp = get_date_stamp(); fprintf(stdout, "*** [process %d] waits buffmap from [process %d] (size = %d) (%s)\n", rank, dom, dominfo[dom].size_recv, date_stamp); fflush(stdout); free(date_stamp);#endif /* start Irecv data */ MPI_Irecv(dominfo[dom].buff, dominfo[dom].size_recv, MPI_UNSIGNED_CHAR, dom, tag_buff, com, req_recv_tab + ireq); ireq++; } timers[3] = MPI_Wtime() - start_time_sendrecv; /* mem info */ fprintf(stdout, "*** [process %d] all data buffers space to be received are allocated, MEM=%.2f\n", rank, get_mem_usage()); fflush(stdout); /****************/ /* send buffer */ /****************/ ireq = 0; for (dom = 0; dom < nbprocs; dom++) { if (dom == rank) continue; start_time_io = MPI_Wtime(); /* load domain cell info to mem and send it */ if ((dominfo[dom].buffmap = load_file_to_memory(dominfo[dom].file, &buff_size)) == NULL) { fprintf(stderr, "%s : could not load %s to memory (%d bytes). Check free space on disk. Exiting.\n", PACKAGE, dominfo[dom].file, buff_size); MPI_Abort(MPI_COMM_WORLD, 1); } timers[9] += MPI_Wtime() - start_time_io;#ifdef MERGE_DEBUG date_stamp = get_date_stamp(); fprintf(stdout, "*** [process %d] sends buffmap to [process %d] (size = %d/%d) *** (%s)\n", rank, dom, buff_size, dominfo[dom].size_send, date_stamp); fflush(stdout); free(date_stamp);#endif MPI_Isend(dominfo[dom].buffmap, dominfo[dom].size_send, MPI_UNSIGNED_CHAR, dom, tag_buff, com, req_send_tab + ireq); ireq++; } /* all Isend have been emitted */ timers[4] = MPI_Wtime() - timers[3] - start_time_sendrecv;#ifdef MERGE_DEBUG date_stamp = get_date_stamp(); fprintf(stdout, "*** [process %d] sent all buffmap to all other processes (%d) *** (%s)\n", rank, ireq, date_stamp); fflush(stdout); free(date_stamp);#endif /* mem info */ fprintf(stdout, "*** [process %d] all data buffers space to be send are allocated, MEM=%.2f\n", rank, get_mem_usage()); fflush(stdout); /****************/ /* Manage data */ /***************/ { int send_status = 1; int recv_status = 1; int send_proc_id, recv_proc_id; MPI_Status sstatus, rstatus; while (send_status || recv_status) { if (send_status) { /* if data are sent -> clean them */ MPI_Waitany(ireq, req_send_tab, &send_proc_id, &sstatus); if (send_proc_id == MPI_UNDEFINED) { send_status = 0; /* from Isend to sending complete */ timers[5] = MPI_Wtime() - timers[4] - timers[3] - start_time_sendrecv; } else { if (send_proc_id >= rank) { send_proc_id++; }#ifdef MERGE_DEBUG date_stamp = get_date_stamp(); fprintf(stdout, "*** [process %d] MPI_Isend (buffer) to [process %d] OK *** (%s)\n", rank, send_proc_id, date_stamp); fflush(stdout); free(date_stamp);#endif munmap(dominfo[send_proc_id].buffmap, dominfo[send_proc_id].size_send); unlink(dominfo[send_proc_id].file); dominfo[send_proc_id].buffmap = NULL; } } if (recv_status) { /* if data are received -> process them */ MPI_Waitany(ireq, req_recv_tab, &recv_proc_id, &rstatus); if (recv_proc_id == MPI_UNDEFINED) { recv_status = 0; /* from Isend to receiving complete */ timers[6] = MPI_Wtime() - timers[4] - timers[3] - start_time_sendrecv; } else { if (recv_proc_id >= rank) { recv_proc_id++; }#ifdef MERGE_DEBUG date_stamp = get_date_stamp(); fprintf(stdout, "*** [process %d] MPI_Irecv (buffer) from [process %d] OK *** (%s)\n", rank, recv_proc_id, date_stamp); fflush(stdout); free(date_stamp);#endif start_time_io= MPI_Wtime();#ifdef USE_ZLIB zbuf = dominfo[recv_proc_id].buff; zbuf_size = dominfo[recv_proc_id].size_recv; dominfo[recv_proc_id].buff = decompress_cell_data_gzbuffer( cell_info, zbuf, zbuf_size, &(dominfo[recv_proc_id].size_recv), rank); free(zbuf);#endif import_cell_buffer(cell_info, dominfo[recv_proc_id].buff, dominfo[recv_proc_id].size_recv, -1, -1, /* import all layers */ rank, recv_proc_id); free(dominfo[recv_proc_id].buff); timers[9] += MPI_Wtime() - start_time_io; fprintf(stdout, "*** [process %d] after buffer[%d] importation, MEM=%.2f\n", rank, recv_proc_id, get_mem_usage()); fflush(stdout); } } } } date_stamp = get_date_stamp(); fprintf(stdout, "*** [process %d] DATA EXCHANGE COMPLETED, MEM=%.2f (%s)\n", rank, get_mem_usage(), date_stamp); fflush(stdout); free(date_stamp); /******************/ /* cleaning stuff */ /******************/ free(req_send_tab); free(status_send_tab); free(req_recv_tab); free(status_recv_tab); free(dominfo); /* from Isend to send/receive complete */ timers[7] = (MPI_Wtime() - start_time_sendrecv); MPI_Barrier(com); /* full send_receive operation */ timers[8] = (MPI_Wtime() - start_time); return (timers);}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -