⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 vpriv.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
字号:
/** @file vpriv.c implements the driver functions *//*  MPICH-V  Copyright (C) 2002, 2003 Groupe Cluster et Grid, LRI, Universite de Paris Sud  This file is part of MPICH-V.  MPICH-V is free software; you can redistribute it and/or modify  it under the terms of the GNU General Public License as published by  the Free Software Foundation; either version 2 of the License, or  (at your option) any later version.  MPICH-V is distributed in the hope that it will be useful,  but WITHOUT ANY WARRANTY; without even the implied warranty of  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the  GNU General Public License for more details.  You should have received a copy of the GNU General Public License  along with MPICH-V; if not, write to the Free Software  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  $Id: vpriv.c,v 1.41 2006/01/24 19:35:01 rodrigue Exp $*/#include "config.h"         /* for configuration (lvl of debugging, checkpoint, etc...) */#include "debug.h"          /* for debuging            */#include "utils_socket.h"   /* for _uread and _uwrite  */#include "vprivcheckpoint.h" /* for checkpointing hooks */#include "vtypes.h"#include <sys/types.h>#include <sys/stat.h>#include <fcntl.h>#include <sys/time.h>#include <unistd.h>#include <string.h>      /*for mem functions*/#include <signal.h>#include <errno.h>#include <stdlib.h>static char PipeName[128];               /**< basename of the pipe files */static int WTD;                          /**< file descriptor of pipe to write to  */static int RFD;                          /**< file descriptor of pipe to read from */static int iLastRecvFrom;                /**< rank of last reception */static int GROUP;                        /**< my group */extern int MPID_MyWorldSize;             /**< my world size */extern int MPID_MyWorldRank;             /**< my rank */static char debug_uid[32];#if 0#include <asm/unistd.h>static int v_gettimeofday(struct timeval *a1, struct timezone *a2){  long res;  __asm__ volatile ("int $0x80"		    : "=a" (res)		    : "0" (__NR_gettimeofday),"b" ((long)(a1)),		    "c" ((long)(a2)));  if ((unsigned long)(res) >= (unsigned long)(-125)) {    errno = -(res);    res = -1;  }  return (int) (res);}#else#warning "Using libc gettimeofday. Expect timing errors with condor!"#define v_gettimeofday gettimeofday#endifstatic struct timeval tvClient = {0,0};  /**< time of the client start*/#ifdef PROFILEstatic struct timeval ProfileProbe;static int nbcallProbe;static struct timeval ProfileSend;static int nbcallSend;static struct timeval ProfileRecv;static int nbcallRecv;static struct timeval ProfileTV;inline static void enter_function(int *nb){  (*nb)++;  gettimeofday(&ProfileTV, NULL);}inline static void exit_function(struct timeval *tv){  struct timeval now;  gettimeofday(&now, NULL);  timersub(&now, &ProfileTV, &now);  timeradd(&now, tv, tv);}static void display_profile(void){  printp("nbprobe : %d, timeprobe : %d.%d, average timeprobe : %gs",	 nbcallProbe, ProfileProbe.tv_sec, ProfileProbe.tv_usec, 	 (1000000.0*ProfileProbe.tv_sec+(double)ProfileProbe.tv_usec)/(1000000.0*(double)nbcallProbe));  printp("nbsend : %d, timesend : %d.%d, average timesend : %gs",	 nbcallSend, ProfileSend.tv_sec, ProfileSend.tv_usec, 	 (1000000.0*ProfileSend.tv_sec+(double)ProfileSend.tv_usec)/(1000000.0*(double)nbcallSend));  printp("nbrecv : %d, timerecv : %d.%d, average timerecv : %gs",	 nbcallRecv, ProfileRecv.tv_sec, ProfileRecv.tv_usec, 	 (1000000.0*ProfileRecv.tv_sec+(double)ProfileRecv.tv_usec)/(1000000.0*(double)nbcallRecv));}#else#define enter_function(a) do {} while(0)#define exit_function(a) do {} while(0)#define display_profile() do {} while(0)#endif/** try to perform a checkpoint if activated * @param iType: the iType field of a returning sPkt structure from the daemon. * @return 1 iff a checkpoing was performed */static inline int ckpt_check(int iType){  pkt_header sPkt;   printi("ckpt", "ckpt_check iType=%d",iType);  if( iType == CP_START )    {		          printi("ckpt", "checkpoint ordered ");      if(vprivcheckpoint_performcheckpoint(0) == -1)	{	  printe("ERROR sending checkpoint");	  return -1;	}      return 1;    }  if( iType == CP_START_NO_FORK )    {      printi("ckpt", "sequential checkpoint ordered");      if (vprivcheckpoint_performcheckpoint(1) == -1)	{	  printe("ERROR sending checkpoint");	  return -1;	}      _uread( RFD, &sPkt, sizeof(sPkt));      if ( ntohl(sPkt.iType)== DEVICE_EXIT )        {          printi("ckpt", "Exit (crash) ordered");          close(RFD);          close(WTD);                    exit(0);        }           return 1;    }  return 0;}/** establishes a connection with the daemon. *  put the correct value in PipeName before. * */static void connect_my_daemon(void){  char filename[strlen(PipeName)+16];  struct stat s;  sprintf(filename, "%s.Daemon2Driver", PipeName);    printi("init", "opening %s for reading", filename);  RFD = open(filename, O_RDONLY);  if(RFD < 0)    qerror("ERROR opening the pipe from the daemon");  sprintf(filename, "%s.Driver2Daemon", PipeName);   printi("init", "opening %s for writing", filename);  WTD = open(filename, O_WRONLY);  if(WTD < 0)    qerror("ERROR opening the pipe to the daemon");}/** probes the presence of messages * @param tag message id * * @return 0 if no message are available on the memory-channel and 1 otherwise *           result is in iTag */int v_probe(int tag){  /*send a system request to the server and receive an info about the control message list is empty*/  int n=0, result;  pkt_header sPkt;#ifdef DEBUG  memset(&sPkt, 0, sizeof(pkt_header));#endif  enter_function(&nbcallProbe);  sPkt.iTag  = htonl(tag);  sPkt.iSrc  = htonl(-1);               /* any src */  sPkt.iDst  = htonl(MPID_MyWorldRank); /*dst is my rank*/  sPkt.iSize = 0;                       /*just clear this field*/  sPkt.iType = htonl(ANY_MSG_AVAIL);  printi("probe_req", "PROBE starts...");  n = _uwrite( WTD, &sPkt, sizeof(pkt_header) );  if(n<=0)     qerror("ERROR writing to pipe");  printi("probe_req", "probe has sent packet %s", format_packet_long(&sPkt, 48));  n = _uread( RFD, &sPkt, sizeof(pkt_header) );  if(n<0)     qerror("ERROR reading from pipe");  if(n==0)     qerror("connection closed by the daemon");  /* result should be a number equal to or greater than 0. It means the number of control messages avail. */  /* return only 0 or 1 */  result = !(!(sPkt.iTag));  printi("probe_rep", "has read %d bytes, returned %d", n, result);  ckpt_check(ntohl(sPkt.iType));  exit_function(&ProfileProbe);  return result;}/** * Recieve a message from daemon (blocking) * * @param tag message id * @param vpBuf ptr to the destination data * @param iSize the size of the vpBuf buffer * @param iFrom the remote node from where to receive something * * @return 0 or fail (0 is OK) */int v_brecv(int tag, void *vpBuf, int iSize, int iFrom){  pkt_header sPkt;  int n;  int readsize;#ifdef DEBUG  memset(&sPkt, 0, sizeof(pkt_header));#endif  enter_function(&nbcallRecv);  ASSERT(iFrom==-1); checkpoint_before_reception:  sPkt.iTag  = htonl(tag);  sPkt.iSrc  = htonl(iFrom);  sPkt.iDst  = htonl(MPID_MyWorldRank); /* dst is my rank */  sPkt.iSize = htonl(iSize);            /* should be refilled by the server */  /* specifies the receive type, RECV_ANY_CONTROL or RECV_FROM_CHANNEL */  sPkt.iType = htonl(((tag==0) ? RECV_ANY_CONTROL : RECV_FROM_CHANNEL));  printi( "brecv_req", "receive request : %s", format_packet(&sPkt));  n = _uwrite( WTD, &sPkt, sizeof(pkt_header));   /*write the system packet*/  if( n < sizeof(pkt_header))    qerror("ERROR writing to pipe: %d bytes", n);  n = _uread( RFD, &sPkt, sizeof(pkt_header));  if( n < sizeof(pkt_header) )    qerror("ERROR reading from pipe: %d bytes", n);  printi( "brecv_rep" ,"has got packet: %s", format_packet_long(&sPkt, 48));  if( ckpt_check( ntohl(sPkt.iType) ) )    goto checkpoint_before_reception;  if( ntohl(sPkt.iSize) > iSize )    printw( "WARNING: message truncated (want %d should receive %d)", iSize, ntohl(sPkt.iSize) );  readsize = ((iSize < ntohl(sPkt.iSize)) ? iSize : ntohl(sPkt.iSize) );  n = _uread( RFD, vpBuf, readsize ); /*read the data itself*/  if( n < 0 )     qerror( "ERROR reading from pipe");  if( (n == 0) && (readsize != 0) )     qerror("connection closed by the daemon");  printi("brecv", "finished reading, %d bytes", n);  iLastRecvFrom = ntohl(sPkt.iSrc);  exit_function(&ProfileRecv);  return 0;}/** * @return The node from where we have received the last data */int v_from(){    return iLastRecvFrom;}/** blocking send to daemon. * * @param tag message id * @param vpBuf pointer to buffer to be send * @param iSize number of bytes to be send * @param iDest the destination node * * @return 0 (0 is OK, fail if could not return 0) */int v_bsend(int tag, void *vpBuf, int iSize, int iDest){    int n1, n2, n;    pkt_header sPkt;#ifdef DEBUG    char strBuf[49*3];    memset(&sPkt, 0, sizeof(pkt_header));#endif    enter_function(&nbcallSend);    sPkt.iTag  = htonl(tag);    sPkt.iSrc  = htonl(MPID_MyWorldRank);    sPkt.iDst  = htonl(iDest);    sPkt.iSize = htonl(iSize);    /* specify the type of send, SEND_CONTROL or SEND_CHANNEL */    sPkt.iType = htonl(((tag==0) ? SEND_CONTROL : SEND_CHANNEL));#ifdef DEBUG    strBuf[0] = 0;    for(n1 = 0; (n1 < 48) && (n1 < iSize); n1++)      snprintf(strBuf+strlen(strBuf), 49*3-strlen(strBuf), "%02x ", ((unsigned char*)vpBuf)[n1]);    printi( "bsend_req", "starts send (pipe %d) : %s [%s]", WTD, format_packet(&sPkt), strBuf );#endif    n1 = _uwrite( WTD, &sPkt, sizeof(pkt_header));    if( n1 < sizeof(pkt_header) )      qerror("ERROR writing to pipe: %d bytes", n1);    n2 = _uwrite( WTD, vpBuf, iSize );    if( n2 < iSize )      qerror( "ERROR writing data to pipe (%d)", n2);    printi( "bsend_rep", "send %d/%d bytes (usefull information %d)",	    n1+n2, iSize + sizeof(pkt_header), iSize );    n = _uread( RFD, &sPkt, sizeof(pkt_header));    if( n < sizeof(pkt_header) )      qerror( "ERROR waiting for rep from server (%d bytes)", n);    ckpt_check( ntohl(sPkt.iType) );    exit_function(&ProfileSend);    return 0;}void VPreCheckpoint(void){  printi("ckpt", "closing pipe to daemon");  close(WTD);  close(RFD);  printi("ckpt", "closing debug message service connection");  closeLog();}void VPostCheckpoint(void){  FILE *f;  f = fopen("/tmp/debug.log", "a");  fprintf(f, "VPostCheckpoint : before initDebug(%s, NULL)\n", debug_uid?debug_uid:"(null)");  fflush(f);  initDebug(debug_uid, NULL);  fprintf(f, "VPostCheckpoint : initDebug did not SEGFAULT\n");  fflush(f);  printi("ckpt", "Image reloaded, debug message service restarted, Reconnecting to v daemon");  fprintf(f, "VPostCheckpoint : printi didn't failed\n");  connect_my_daemon();}/** * removes parameters * * @param ipArgc [IN/OUT] number of strings in the cpppArgv * @param cpppArgv [IN/OUT] argv like string array * @param num [IN] number of parameters that have to be removed * * @return 1 if there are enough parameters to remove and 0 otherwise */static int remove_param(int *argc, char **argv[], int from, int num){  int i;    if(*argc < from + num) return 0;    for(i = 0; i < *argc - from - num; i++)    {#ifdef DEBUG      if(i < num) printi("param", "removing param %d : %s", from + i, (*argv)[from + i]);#endif      (*argv)[from + i] = (*argv)[from + num + i];    }  *argc -= num;  return 1;}/** Entry point of the library * * @param argc [IN/OUT] parameter, hold the number of parameters * @param argv [IN/OUT] parameter, hold the parameters. * @return 0, or fail */int v_init(int* argc, char **argv[]){  int i, reste, nbmatch;  int  fd1, fd2;  char debug[128];  char g[16], ws[16], wr[16]; #ifdef PROFILE#warning PROFILE IS ACTIVATED  atexit(display_profile);#endif    for( i = 1; i < *argc; i++){    g[0] = 0;    ws[0] = 0;    wr[0] = 0;    PipeName[0] = 0;    debug[0] = 0;        nbmatch = sscanf((*argv)[i], "-vparam(%[^,], %[^,], %[^,], %[^,], %[^)])",		     g, ws, wr, PipeName, debug);     if((nbmatch == 5) || (nbmatch == 4))      {        GROUP = strtol(g, NULL, 0);        MPID_MyWorldSize = strtol(ws, NULL, 0);        MPID_MyWorldRank = strtol(wr, NULL, 0);	/* This must be ASAP because of BLCR assuming that its static local_fd will have the same value	 * in parent and child.	 * More precisely, this must be before opening any file descriptor	 */	printi("INIT", "initializing the checkpoint");	if(vprivcheckpoint_init(GROUP, MPID_MyWorldRank) == -1)	  qerror("INIT: unable to initialize the checkpoint");	        sprintf(debug_uid, "%d(device)", MPID_MyWorldRank);	        if(nbmatch == 5)          {            initDebug(debug_uid, debug);          }	else          {            initDebug(debug_uid, "");	  }        remove_param( argc, argv, i--, 1);        break;     }  }  for(i = 0; i < *argc; i++) printi("param", "MPI parameter[%d]: %s", i, (*argv)[i]);    if(MPID_MyWorldSize == -1)     {      initDebug("unknown(device)", "");      printq("-vparam(group, rank, size, ipc, debug) argument is missing");    }    printi("INIT", "MPI CH_V device init : group = %d, worldsize = %d, rank = %d, PipeName = %s",	 GROUP, MPID_MyWorldSize, MPID_MyWorldRank, PipeName);    connect_my_daemon();  printi("INIT", "connected to the daemon on %d, %d", WTD, RFD);    /* let this call just before return */  v_gettimeofday( &tvClient, NULL );  return 0;}/** Exit point of the library *  @return 0 or fail... */int v_finalize(void){  pkt_header sPkt;  fd_set unixsock;  char buff;  int ret;#ifdef DEBUG  memset(&sPkt, 0, sizeof(pkt_header));#endif  sPkt.iSrc  = htonl(MPID_MyWorldRank);  sPkt.iDst  = htonl(MPID_MyWorldRank);  sPkt.iType = htonl(CLIENT_EXIT);  printi("FINAL", "sending client_exit message");  ret = _uwrite( WTD, &sPkt, sizeof(sPkt) );  ASSERT(ret == sizeof(sPkt));  printi("FINAL", "entering last read");  _uread( RFD, &sPkt, sizeof(sPkt));  printi("FINAL", "exiting last read because %s", strerror(errno));  printi("FINAL","closing daemon pipes");  close(RFD);  close(WTD);  return 0;}/** * @return number of microsecond from the begining of the execution */double v_time(){    struct timeval tv;    double time = 0.0;    v_gettimeofday(&tv,NULL);    time = ((tv.tv_sec*1000000 + tv.tv_usec) - (tvClient.tv_sec*1000000 + tvClient.tv_usec))*0.000001;    return time;}/** * @param t [OUT] unit of the time returned by the previsious function (in seconds) */void v_tick(double *t){    *t = 0.000001;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -