⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 select.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
📖 第 1 页 / 共 2 页
字号:
/*MPICH-V2Copyright (C) 2002, 2003, 2004 Groupe Cluster et Grid, LRI, Universite de Paris SudThis file is part of MPICH-V2.MPICH-V2 is free software; you can redistribute it and/or modifyit under the terms of the GNU General Public License as published bythe Free Software Foundation; either version 2 of the License, or(at your option) any later version.MPICH-V2 is distributed in the hope that it will be useful,but WITHOUT ANY WARRANTY; without even the implied warranty ofMERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See theGNU General Public License for more details.You should have received a copy of the GNU General Public Licensealong with MPICH-V2; if not, write to the Free SoftwareFoundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA$Id: select.c,v 1.72 2004/12/02 12:39:20 herault Exp $*//** @file select.c implements the main select loop of the daemon */#include <unistd.h>extern char *get_current_dir_name(void); /** should be in unistd.h, but... */#include <fcntl.h>#include <stdio.h>#include <sys/types.h>#include <sys/socket.h>#include <sys/un.h>#include <sys/time.h>#include <netinet/in.h>#include <netinet/tcp.h>#include <arpa/inet.h>#include <netdb.h>#include <sys/wait.h>#include "config.h"#include "debug.h"#include "simple_list.h"#include "utils_socket.h"#include "vtypes.h"#include "select.h"#include "checkpoint.h"#include "v2run_msg.h"#include "connect.h"#include "flitvec.h"#include "flitvecio.h"#include "otherdaemoncomm.h"#include "daemoncom.h"#include "localmpi.h"#include "daemonCsched.h"#include "elClient.h"#include "checkpoint.h"static int myrank;  /**< local mpi process's rank */static int mygroup; /**< local mpi process's group */static int scon = -1;	    /**< listen-type socket to accept connection */static int smpi_read = -1;  /**< read pipe between local mpi process and comunication daemon (mpi process write on it, the daemon reads on it)*/static int smpi_write = -1;  /**< write pipe between local mpi process and comunication daemon (mpi process reads on it, the daemon write on it)*/static int sockEL = -1;	    /**< Event Logger TCP socket */static int sockCSCHED = -1; /**< Checkpoint Scheduler TCP socket */static int disp_fd = -1;    /**< socket to dispatcher */static struct sockaddr_in csip;    /**< address of the checkpoint server */static struct sockaddr_in elip;    /**< address of the event logger */static struct sockaddr_in cschedip;/**< address of the checkpoint scheduler *//** information about socket and message to other mpipeer */static int sizeofmpipeer;  /**< size of mpipeer : number of mpi processes to be connected */static SockInfo *mpipeer;  /**< Socket informations for all mpi process connected, refered by their rank */#define cin pckpt_getCkptInfo()static int AllPeerHasFinalized;  /**< 1 if the dispatcher told us that all peer (including us) has finalized */static int IHaveFinalized;       /**< 1 if this rank has finalized */static int ChildIsDead;          /**< 1 if the child is dead after IHaveFinalized */static int recv_line(int s, char *line, int maxlen){  char c;  int i;  for(i = 0; i < maxlen; i++)    {      if(recv(s, &c, 1, 0) != 1)	return -1;      line[i] = c;      if(c == '\n')	{	  line[i] = 0;	  return 0;	}    }  line[i] = 0;  return 1;}int recv_intparam(int s, char *name){  char line[1024];  if( recv_line(s, line, 1023) < 0 )    {      perror("receive intparam");      close(s);      exit(1);    }  if( !strncmp(line, name, strlen(name)) )    return strtol( line + strlen(name) + 1, NULL, 0);  fprintf(stderr, "unexpected parameter: %s\n", line);  exit(2);}char *recv_strparam(int s, char *name){  char line[1024];  char *value;  if( recv_line(s, line, 1023) < 0 )    {      perror("receive strparam");      close(s);      exit(1);    }  if( !strncmp(line, name, strlen(name)) )    {      value = (char*)calloc(1, strlen(line) - strlen(name) );      memcpy(value, line+strlen(name)+1, strlen(line)-strlen(name)-1);      return value;    }  exit(2);}void init_dispatcher (int sock_to_dispatcher) {  struct sockaddr_in *nodeListArray;  int rank, i;  char *ip;  int port;  disp_fd = sock_to_dispatcher;  printi("init", "connected with dispatcher on %d", disp_fd);  nodeListArray =    (struct sockaddr_in *) malloc (sizeofmpipeer *				   sizeof (struct sockaddr_in));  /* Here we receive the list of nodes in an array */  for(i = 0; i < sizeofmpipeer; i++)    {      rank = recv_intparam(sock_to_dispatcher, "node_rank");      if(rank != i)	qerror("unable to receive rank or bad rank reception (%d while waiting for %d) when receiving the node array from dispatcher",	       rank, i);      ip = recv_strparam(sock_to_dispatcher, "node_ip");      if(ip == NULL)	qerror("unable to receive %d's ip when receiving the node array from dispatcher", i);      port = recv_intparam(sock_to_dispatcher, "node_port");      nodeListArray[i].sin_family = AF_INET;      nodeListArray[i].sin_port = htons(port);      inet_aton(ip, &nodeListArray[i].sin_addr);      free(ip);    }  free( recv_strparam(sock_to_dispatcher, "EOF") );  /* With this we can fill the array with the mpi peers */  printi("init", "parsing config array");  parse_config_array (nodeListArray, mpipeer);  free (nodeListArray);}void initstruct (int group, int rank, int np, struct sockaddr_in el, struct sockaddr_in cs, struct sockaddr_in sc, int use_local){  myrank = rank;  mygroup = group;  sizeofmpipeer = np;  memcpy(&elip, &el, sizeof (struct sockaddr_in));   memcpy(&cschedip, &sc, sizeof (struct sockaddr_in));  mpipeer = initMpipeer(np, myrank);  /* initialize the socket information structure */  ftp_init(np, rank);                 /* initialize the FT structure */  initCommunication(sizeofmpipeer);   /* initialize comm buffer */   sockCSCHED = ftp_init_csched(&cschedip, myrank, sizeofmpipeer);  if(cs.sin_port != (short)0)  {    memcpy(&csip, &cs, sizeof (struct sockaddr_in));    if(pckpt_init((struct sockaddr *)&csip, use_local, group, rank, np) < 0) printq("Unable to initialize checkpoint structures %d:%d", group, rank);  }  else  {    csip.sin_port = (short)0;     if(pckpt_init(NULL, use_local, group, rank, np) < 0) printq("Unable to initialize checkpoint structures %d:%d", group, rank);  }}void initcomm (int RFmpi, int WTmpi, int _scon, int sock_to_dispatcher){  smpi_read = RFmpi;  smpi_write = WTmpi;  scon = _scon;  printi ("init", "connect to el : %s:%u", inet_ntoa(elip.sin_addr),	  ntohs (elip.sin_port));  sockEL = ftp_el_connect (&elip);  ftp_el_get_reex (sockEL);  init_dispatcher (sock_to_dispatcher);  init_connect_to_MPI(mpipeer, myrank);  printi("init", "init comm finished");}void close_all_sockets(){  close_all_daemon_sockets(mpipeer);  if (smpi_read >= 0)    close (smpi_read);  if (smpi_write >= 0)    close (smpi_write);  if (scon >= 0)    close (scon);  if (sockCSCHED >= 0)    close(sockCSCHED);  if (disp_fd >= 0)    close(disp_fd);}/** get the fd_set for the select loop * @param readset : set of sockets to read from * @param writeset : set of sockets to write into */static inline int getallfdsets(fd_set * readset, fd_set * writeset){  int i;  int max = 0;#define FD_SETm(s, set) do {FD_SET(s, set);if(s > max) max = s;} while(0)/*** Local mpi process */  if(!ChildIsDead)    FD_SETm(smpi_read, readset);/*** peer sockets */  /* construction of the read_set : listening all */  for(i = 0; i < sizeofmpipeer; i++)  {    if(mpipeer[i].fd < 0) /* no connection to peer i */      {        if(mpipeer[i].cofd >= 0)          {            /* add connecting socket (cofd) */            if(mpipeer[i].costate == CO_COWAITING)               FD_SETm(mpipeer[i].cofd, writeset);            else               FD_SETm(mpipeer[i].cofd, readset);            continue;          }        continue;      }    if( ftp_msgrecv_possible(mpipeer, i) )      FD_SETm(mpipeer[i].fd, readset);  }  /*** server sockets */  if (sockEL != -1)  {    FD_SETm(sockEL , readset);    if(ftp_getELfdsets())       FD_SET(sockEL, writeset);  }  if (sockCSCHED != -1)  {    FD_SETm(sockCSCHED , readset);    if(ftp_getcschedfdset())       FD_SET(sockCSCHED, writeset);  }  /* Dispatcher */  if(disp_fd != -1)     FD_SETm(disp_fd, readset);  /*  checkpoint communications */  if(pckpt_rfdset()) FD_SETm(cin->pipe, readset);  /*  protocol channel should never be a bottleneck as it should be of negligible size   *  in comparison to data channel. If data channel becomes available it   *  is reasonable to consider that protocol channel is but not the contrary.   */  if(pckpt_wfdset())   {    if(cin->sock.data != -1) FD_SETm(cin->sock.data, writeset);    if(cin->sock.file != -1) FD_SETm(cin->sock.file, writeset);  }  /*** Connexion managment sockets */  /* Connexion to other computing peers managment */  /** @todo may change: if everything is connected, don't add   * Aur閘ien>> This has not to be changed: TCP timeout could have occured for dispacher before   *   this daemon detected failure of the reconnecting peer. The connexion protocol   *   ensure that there is always a connexion as long as there are a mean for   *   dispatcher to detect faults (even if computing nodes are not able to detect faults).   *   not replacing non failed peers by newly relaunched one should lead to   *   global inconsistent state from dispatcher point of view. This should be avoided   * as it means only that something goes wrong   */  FD_SETm(scon, readset);  /* processes that are connecting */  max = getconnectingfdset(readset, max);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -