📄 vpriv.c
字号:
/** @file vpriv.c implements the driver functions *//* MPICH-V Copyright (C) 2002, 2003 Groupe Cluster et Grid, LRI, Universite de Paris Sud This file is part of MPICH-V. MPICH-V is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. MPICH-V is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with MPICH-V; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA $Id: vpriv.c,v 1.41 2006/01/24 19:35:01 rodrigue Exp $*/#include "config.h" /* for configuration (lvl of debugging, checkpoint, etc...) */#include "debug.h" /* for debuging */#include "utils_socket.h" /* for _uread and _uwrite */#include "vprivcheckpoint.h" /* for checkpointing hooks */#include "vtypes.h"#include <sys/types.h>#include <sys/stat.h>#include <fcntl.h>#include <sys/time.h>#include <unistd.h>#include <string.h> /*for mem functions*/#include <signal.h>#include <errno.h>#include <stdlib.h>static char PipeName[128]; /**< basename of the pipe files */static int WTD; /**< file descriptor of pipe to write to */static int RFD; /**< file descriptor of pipe to read from */static int iLastRecvFrom; /**< rank of last reception */static int GROUP; /**< my group */extern int MPID_MyWorldSize; /**< my world size */extern int MPID_MyWorldRank; /**< my rank */static char debug_uid[32];#if 0#include <asm/unistd.h>static int v_gettimeofday(struct timeval *a1, struct timezone *a2){ long res; __asm__ volatile ("int $0x80" : "=a" (res) : "0" (__NR_gettimeofday),"b" ((long)(a1)), "c" ((long)(a2))); if ((unsigned long)(res) >= (unsigned long)(-125)) { errno = -(res); res = -1; } return (int) (res);}#else#warning "Using libc gettimeofday. Expect timing errors with condor!"#define v_gettimeofday gettimeofday#endifstatic struct timeval tvClient = {0,0}; /**< time of the client start*/#ifdef PROFILEstatic struct timeval ProfileProbe;static int nbcallProbe;static struct timeval ProfileSend;static int nbcallSend;static struct timeval ProfileRecv;static int nbcallRecv;static struct timeval ProfileTV;inline static void enter_function(int *nb){ (*nb)++; gettimeofday(&ProfileTV, NULL);}inline static void exit_function(struct timeval *tv){ struct timeval now; gettimeofday(&now, NULL); timersub(&now, &ProfileTV, &now); timeradd(&now, tv, tv);}static void display_profile(void){ printp("nbprobe : %d, timeprobe : %d.%d, average timeprobe : %gs", nbcallProbe, ProfileProbe.tv_sec, ProfileProbe.tv_usec, (1000000.0*ProfileProbe.tv_sec+(double)ProfileProbe.tv_usec)/(1000000.0*(double)nbcallProbe)); printp("nbsend : %d, timesend : %d.%d, average timesend : %gs", nbcallSend, ProfileSend.tv_sec, ProfileSend.tv_usec, (1000000.0*ProfileSend.tv_sec+(double)ProfileSend.tv_usec)/(1000000.0*(double)nbcallSend)); printp("nbrecv : %d, timerecv : %d.%d, average timerecv : %gs", nbcallRecv, ProfileRecv.tv_sec, ProfileRecv.tv_usec, (1000000.0*ProfileRecv.tv_sec+(double)ProfileRecv.tv_usec)/(1000000.0*(double)nbcallRecv));}#else#define enter_function(a) do {} while(0)#define exit_function(a) do {} while(0)#define display_profile() do {} while(0)#endif/** try to perform a checkpoint if activated * @param iType: the iType field of a returning sPkt structure from the daemon. * @return 1 iff a checkpoing was performed */static inline int ckpt_check(int iType){ pkt_header sPkt; printi("ckpt", "ckpt_check iType=%d",iType); if( iType == CP_START ) { printi("ckpt", "checkpoint ordered "); if(vprivcheckpoint_performcheckpoint(0) == -1) { printe("ERROR sending checkpoint"); return -1; } return 1; } if( iType == CP_START_NO_FORK ) { printi("ckpt", "sequential checkpoint ordered"); if (vprivcheckpoint_performcheckpoint(1) == -1) { printe("ERROR sending checkpoint"); return -1; } _uread( RFD, &sPkt, sizeof(sPkt)); if ( ntohl(sPkt.iType)== DEVICE_EXIT ) { printi("ckpt", "Exit (crash) ordered"); close(RFD); close(WTD); exit(0); } return 1; } return 0;}/** establishes a connection with the daemon. * put the correct value in PipeName before. * */static void connect_my_daemon(void){ char filename[strlen(PipeName)+16]; struct stat s; sprintf(filename, "%s.Daemon2Driver", PipeName); printi("init", "opening %s for reading", filename); RFD = open(filename, O_RDONLY); if(RFD < 0) qerror("ERROR opening the pipe from the daemon"); sprintf(filename, "%s.Driver2Daemon", PipeName); printi("init", "opening %s for writing", filename); WTD = open(filename, O_WRONLY); if(WTD < 0) qerror("ERROR opening the pipe to the daemon");}/** probes the presence of messages * @param tag message id * * @return 0 if no message are available on the memory-channel and 1 otherwise * result is in iTag */int v_probe(int tag){ /*send a system request to the server and receive an info about the control message list is empty*/ int n=0, result; pkt_header sPkt;#ifdef DEBUG memset(&sPkt, 0, sizeof(pkt_header));#endif enter_function(&nbcallProbe); sPkt.iTag = htonl(tag); sPkt.iSrc = htonl(-1); /* any src */ sPkt.iDst = htonl(MPID_MyWorldRank); /*dst is my rank*/ sPkt.iSize = 0; /*just clear this field*/ sPkt.iType = htonl(ANY_MSG_AVAIL); printi("probe_req", "PROBE starts..."); n = _uwrite( WTD, &sPkt, sizeof(pkt_header) ); if(n<=0) qerror("ERROR writing to pipe"); printi("probe_req", "probe has sent packet %s", format_packet_long(&sPkt, 48)); n = _uread( RFD, &sPkt, sizeof(pkt_header) ); if(n<0) qerror("ERROR reading from pipe"); if(n==0) qerror("connection closed by the daemon"); /* result should be a number equal to or greater than 0. It means the number of control messages avail. */ /* return only 0 or 1 */ result = !(!(sPkt.iTag)); printi("probe_rep", "has read %d bytes, returned %d", n, result); ckpt_check(ntohl(sPkt.iType)); exit_function(&ProfileProbe); return result;}/** * Recieve a message from daemon (blocking) * * @param tag message id * @param vpBuf ptr to the destination data * @param iSize the size of the vpBuf buffer * @param iFrom the remote node from where to receive something * * @return 0 or fail (0 is OK) */int v_brecv(int tag, void *vpBuf, int iSize, int iFrom){ pkt_header sPkt; int n; int readsize;#ifdef DEBUG memset(&sPkt, 0, sizeof(pkt_header));#endif enter_function(&nbcallRecv); ASSERT(iFrom==-1); checkpoint_before_reception: sPkt.iTag = htonl(tag); sPkt.iSrc = htonl(iFrom); sPkt.iDst = htonl(MPID_MyWorldRank); /* dst is my rank */ sPkt.iSize = htonl(iSize); /* should be refilled by the server */ /* specifies the receive type, RECV_ANY_CONTROL or RECV_FROM_CHANNEL */ sPkt.iType = htonl(((tag==0) ? RECV_ANY_CONTROL : RECV_FROM_CHANNEL)); printi( "brecv_req", "receive request : %s", format_packet(&sPkt)); n = _uwrite( WTD, &sPkt, sizeof(pkt_header)); /*write the system packet*/ if( n < sizeof(pkt_header)) qerror("ERROR writing to pipe: %d bytes", n); n = _uread( RFD, &sPkt, sizeof(pkt_header)); if( n < sizeof(pkt_header) ) qerror("ERROR reading from pipe: %d bytes", n); printi( "brecv_rep" ,"has got packet: %s", format_packet_long(&sPkt, 48)); if( ckpt_check( ntohl(sPkt.iType) ) ) goto checkpoint_before_reception; if( ntohl(sPkt.iSize) > iSize ) printw( "WARNING: message truncated (want %d should receive %d)", iSize, ntohl(sPkt.iSize) ); readsize = ((iSize < ntohl(sPkt.iSize)) ? iSize : ntohl(sPkt.iSize) ); n = _uread( RFD, vpBuf, readsize ); /*read the data itself*/ if( n < 0 ) qerror( "ERROR reading from pipe"); if( (n == 0) && (readsize != 0) ) qerror("connection closed by the daemon"); printi("brecv", "finished reading, %d bytes", n); iLastRecvFrom = ntohl(sPkt.iSrc); exit_function(&ProfileRecv); return 0;}/** * @return The node from where we have received the last data */int v_from(){ return iLastRecvFrom;}/** blocking send to daemon. * * @param tag message id * @param vpBuf pointer to buffer to be send * @param iSize number of bytes to be send * @param iDest the destination node * * @return 0 (0 is OK, fail if could not return 0) */int v_bsend(int tag, void *vpBuf, int iSize, int iDest){ int n1, n2, n; pkt_header sPkt;#ifdef DEBUG char strBuf[49*3]; memset(&sPkt, 0, sizeof(pkt_header));#endif enter_function(&nbcallSend); sPkt.iTag = htonl(tag); sPkt.iSrc = htonl(MPID_MyWorldRank); sPkt.iDst = htonl(iDest); sPkt.iSize = htonl(iSize); /* specify the type of send, SEND_CONTROL or SEND_CHANNEL */ sPkt.iType = htonl(((tag==0) ? SEND_CONTROL : SEND_CHANNEL));#ifdef DEBUG strBuf[0] = 0; for(n1 = 0; (n1 < 48) && (n1 < iSize); n1++) snprintf(strBuf+strlen(strBuf), 49*3-strlen(strBuf), "%02x ", ((unsigned char*)vpBuf)[n1]); printi( "bsend_req", "starts send (pipe %d) : %s [%s]", WTD, format_packet(&sPkt), strBuf );#endif n1 = _uwrite( WTD, &sPkt, sizeof(pkt_header)); if( n1 < sizeof(pkt_header) ) qerror("ERROR writing to pipe: %d bytes", n1); n2 = _uwrite( WTD, vpBuf, iSize ); if( n2 < iSize ) qerror( "ERROR writing data to pipe (%d)", n2); printi( "bsend_rep", "send %d/%d bytes (usefull information %d)", n1+n2, iSize + sizeof(pkt_header), iSize ); n = _uread( RFD, &sPkt, sizeof(pkt_header)); if( n < sizeof(pkt_header) ) qerror( "ERROR waiting for rep from server (%d bytes)", n); ckpt_check( ntohl(sPkt.iType) ); exit_function(&ProfileSend); return 0;}void VPreCheckpoint(void){ printi("ckpt", "closing pipe to daemon"); close(WTD); close(RFD); printi("ckpt", "closing debug message service connection"); closeLog();}void VPostCheckpoint(void){ FILE *f; f = fopen("/tmp/debug.log", "a"); fprintf(f, "VPostCheckpoint : before initDebug(%s, NULL)\n", debug_uid?debug_uid:"(null)"); fflush(f); initDebug(debug_uid, NULL); fprintf(f, "VPostCheckpoint : initDebug did not SEGFAULT\n"); fflush(f); printi("ckpt", "Image reloaded, debug message service restarted, Reconnecting to v daemon"); fprintf(f, "VPostCheckpoint : printi didn't failed\n"); connect_my_daemon();}/** * removes parameters * * @param ipArgc [IN/OUT] number of strings in the cpppArgv * @param cpppArgv [IN/OUT] argv like string array * @param num [IN] number of parameters that have to be removed * * @return 1 if there are enough parameters to remove and 0 otherwise */static int remove_param(int *argc, char **argv[], int from, int num){ int i; if(*argc < from + num) return 0; for(i = 0; i < *argc - from - num; i++) {#ifdef DEBUG if(i < num) printi("param", "removing param %d : %s", from + i, (*argv)[from + i]);#endif (*argv)[from + i] = (*argv)[from + num + i]; } *argc -= num; return 1;}/** Entry point of the library * * @param argc [IN/OUT] parameter, hold the number of parameters * @param argv [IN/OUT] parameter, hold the parameters. * @return 0, or fail */int v_init(int* argc, char **argv[]){ int i, reste, nbmatch; int fd1, fd2; char debug[128]; char g[16], ws[16], wr[16]; #ifdef PROFILE#warning PROFILE IS ACTIVATED atexit(display_profile);#endif for( i = 1; i < *argc; i++){ g[0] = 0; ws[0] = 0; wr[0] = 0; PipeName[0] = 0; debug[0] = 0; nbmatch = sscanf((*argv)[i], "-vparam(%[^,], %[^,], %[^,], %[^,], %[^)])", g, ws, wr, PipeName, debug); if((nbmatch == 5) || (nbmatch == 4)) { GROUP = strtol(g, NULL, 0); MPID_MyWorldSize = strtol(ws, NULL, 0); MPID_MyWorldRank = strtol(wr, NULL, 0); /* This must be ASAP because of BLCR assuming that its static local_fd will have the same value * in parent and child. * More precisely, this must be before opening any file descriptor */ printi("INIT", "initializing the checkpoint"); if(vprivcheckpoint_init(GROUP, MPID_MyWorldRank) == -1) qerror("INIT: unable to initialize the checkpoint"); sprintf(debug_uid, "%d(device)", MPID_MyWorldRank); if(nbmatch == 5) { initDebug(debug_uid, debug); } else { initDebug(debug_uid, ""); } remove_param( argc, argv, i--, 1); break; } } for(i = 0; i < *argc; i++) printi("param", "MPI parameter[%d]: %s", i, (*argv)[i]); if(MPID_MyWorldSize == -1) { initDebug("unknown(device)", ""); printq("-vparam(group, rank, size, ipc, debug) argument is missing"); } printi("INIT", "MPI CH_V device init : group = %d, worldsize = %d, rank = %d, PipeName = %s", GROUP, MPID_MyWorldSize, MPID_MyWorldRank, PipeName); connect_my_daemon(); printi("INIT", "connected to the daemon on %d, %d", WTD, RFD); /* let this call just before return */ v_gettimeofday( &tvClient, NULL ); return 0;}/** Exit point of the library * @return 0 or fail... */int v_finalize(void){ pkt_header sPkt; fd_set unixsock; char buff; int ret;#ifdef DEBUG memset(&sPkt, 0, sizeof(pkt_header));#endif sPkt.iSrc = htonl(MPID_MyWorldRank); sPkt.iDst = htonl(MPID_MyWorldRank); sPkt.iType = htonl(CLIENT_EXIT); printi("FINAL", "sending client_exit message"); ret = _uwrite( WTD, &sPkt, sizeof(sPkt) ); ASSERT(ret == sizeof(sPkt)); printi("FINAL", "entering last read"); _uread( RFD, &sPkt, sizeof(sPkt)); printi("FINAL", "exiting last read because %s", strerror(errno)); printi("FINAL","closing daemon pipes"); close(RFD); close(WTD); return 0;}/** * @return number of microsecond from the begining of the execution */double v_time(){ struct timeval tv; double time = 0.0; v_gettimeofday(&tv,NULL); time = ((tv.tv_sec*1000000 + tv.tv_usec) - (tvClient.tv_sec*1000000 + tvClient.tv_usec))*0.000001; return time;}/** * @param t [OUT] unit of the time returned by the previsious function (in seconds) */void v_tick(double *t){ *t = 0.000001;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -