📄 select.c
字号:
/*MPICH-V2Copyright (C) 2002, 2003, 2004 Groupe Cluster et Grid, LRI, Universite de Paris SudThis file is part of MPICH-V2.MPICH-V2 is free software; you can redistribute it and/or modifyit under the terms of the GNU General Public License as published bythe Free Software Foundation; either version 2 of the License, or(at your option) any later version.MPICH-V2 is distributed in the hope that it will be useful,but WITHOUT ANY WARRANTY; without even the implied warranty ofMERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See theGNU General Public License for more details.You should have received a copy of the GNU General Public Licensealong with MPICH-V2; if not, write to the Free SoftwareFoundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA$Id: select.c,v 1.72 2004/12/02 12:39:20 herault Exp $*//** @file select.c implements the main select loop of the daemon */#include <unistd.h>extern char *get_current_dir_name(void); /** should be in unistd.h, but... */#include <fcntl.h>#include <stdio.h>#include <sys/types.h>#include <sys/socket.h>#include <sys/un.h>#include <sys/time.h>#include <netinet/in.h>#include <netinet/tcp.h>#include <arpa/inet.h>#include <netdb.h>#include <sys/wait.h>#include "config.h"#include "debug.h"#include "simple_list.h"#include "utils_socket.h"#include "vtypes.h"#include "select.h"#include "checkpoint.h"#include "v2run_msg.h"#include "connect.h"#include "flitvec.h"#include "flitvecio.h"#include "otherdaemoncomm.h"#include "daemoncom.h"#include "localmpi.h"#include "daemonCsched.h"#include "elClient.h"#include "checkpoint.h"static int myrank; /**< local mpi process's rank */static int mygroup; /**< local mpi process's group */static int scon = -1; /**< listen-type socket to accept connection */static int smpi_read = -1; /**< read pipe between local mpi process and comunication daemon (mpi process write on it, the daemon reads on it)*/static int smpi_write = -1; /**< write pipe between local mpi process and comunication daemon (mpi process reads on it, the daemon write on it)*/static int sockEL = -1; /**< Event Logger TCP socket */static int sockCSCHED = -1; /**< Checkpoint Scheduler TCP socket */static int disp_fd = -1; /**< socket to dispatcher */static struct sockaddr_in csip; /**< address of the checkpoint server */static struct sockaddr_in elip; /**< address of the event logger */static struct sockaddr_in cschedip;/**< address of the checkpoint scheduler *//** information about socket and message to other mpipeer */static int sizeofmpipeer; /**< size of mpipeer : number of mpi processes to be connected */static SockInfo *mpipeer; /**< Socket informations for all mpi process connected, refered by their rank */#define cin pckpt_getCkptInfo()static int AllPeerHasFinalized; /**< 1 if the dispatcher told us that all peer (including us) has finalized */static int IHaveFinalized; /**< 1 if this rank has finalized */static int ChildIsDead; /**< 1 if the child is dead after IHaveFinalized */static int recv_line(int s, char *line, int maxlen){ char c; int i; for(i = 0; i < maxlen; i++) { if(recv(s, &c, 1, 0) != 1) return -1; line[i] = c; if(c == '\n') { line[i] = 0; return 0; } } line[i] = 0; return 1;}int recv_intparam(int s, char *name){ char line[1024]; if( recv_line(s, line, 1023) < 0 ) { perror("receive intparam"); close(s); exit(1); } if( !strncmp(line, name, strlen(name)) ) return strtol( line + strlen(name) + 1, NULL, 0); fprintf(stderr, "unexpected parameter: %s\n", line); exit(2);}char *recv_strparam(int s, char *name){ char line[1024]; char *value; if( recv_line(s, line, 1023) < 0 ) { perror("receive strparam"); close(s); exit(1); } if( !strncmp(line, name, strlen(name)) ) { value = (char*)calloc(1, strlen(line) - strlen(name) ); memcpy(value, line+strlen(name)+1, strlen(line)-strlen(name)-1); return value; } exit(2);}void init_dispatcher (int sock_to_dispatcher) { struct sockaddr_in *nodeListArray; int rank, i; char *ip; int port; disp_fd = sock_to_dispatcher; printi("init", "connected with dispatcher on %d", disp_fd); nodeListArray = (struct sockaddr_in *) malloc (sizeofmpipeer * sizeof (struct sockaddr_in)); /* Here we receive the list of nodes in an array */ for(i = 0; i < sizeofmpipeer; i++) { rank = recv_intparam(sock_to_dispatcher, "node_rank"); if(rank != i) qerror("unable to receive rank or bad rank reception (%d while waiting for %d) when receiving the node array from dispatcher", rank, i); ip = recv_strparam(sock_to_dispatcher, "node_ip"); if(ip == NULL) qerror("unable to receive %d's ip when receiving the node array from dispatcher", i); port = recv_intparam(sock_to_dispatcher, "node_port"); nodeListArray[i].sin_family = AF_INET; nodeListArray[i].sin_port = htons(port); inet_aton(ip, &nodeListArray[i].sin_addr); free(ip); } free( recv_strparam(sock_to_dispatcher, "EOF") ); /* With this we can fill the array with the mpi peers */ printi("init", "parsing config array"); parse_config_array (nodeListArray, mpipeer); free (nodeListArray);}void initstruct (int group, int rank, int np, struct sockaddr_in el, struct sockaddr_in cs, struct sockaddr_in sc, int use_local){ myrank = rank; mygroup = group; sizeofmpipeer = np; memcpy(&elip, &el, sizeof (struct sockaddr_in)); memcpy(&cschedip, &sc, sizeof (struct sockaddr_in)); mpipeer = initMpipeer(np, myrank); /* initialize the socket information structure */ ftp_init(np, rank); /* initialize the FT structure */ initCommunication(sizeofmpipeer); /* initialize comm buffer */ sockCSCHED = ftp_init_csched(&cschedip, myrank, sizeofmpipeer); if(cs.sin_port != (short)0) { memcpy(&csip, &cs, sizeof (struct sockaddr_in)); if(pckpt_init((struct sockaddr *)&csip, use_local, group, rank, np) < 0) printq("Unable to initialize checkpoint structures %d:%d", group, rank); } else { csip.sin_port = (short)0; if(pckpt_init(NULL, use_local, group, rank, np) < 0) printq("Unable to initialize checkpoint structures %d:%d", group, rank); }}void initcomm (int RFmpi, int WTmpi, int _scon, int sock_to_dispatcher){ smpi_read = RFmpi; smpi_write = WTmpi; scon = _scon; printi ("init", "connect to el : %s:%u", inet_ntoa(elip.sin_addr), ntohs (elip.sin_port)); sockEL = ftp_el_connect (&elip); ftp_el_get_reex (sockEL); init_dispatcher (sock_to_dispatcher); init_connect_to_MPI(mpipeer, myrank); printi("init", "init comm finished");}void close_all_sockets(){ close_all_daemon_sockets(mpipeer); if (smpi_read >= 0) close (smpi_read); if (smpi_write >= 0) close (smpi_write); if (scon >= 0) close (scon); if (sockCSCHED >= 0) close(sockCSCHED); if (disp_fd >= 0) close(disp_fd);}/** get the fd_set for the select loop * @param readset : set of sockets to read from * @param writeset : set of sockets to write into */static inline int getallfdsets(fd_set * readset, fd_set * writeset){ int i; int max = 0;#define FD_SETm(s, set) do {FD_SET(s, set);if(s > max) max = s;} while(0)/*** Local mpi process */ if(!ChildIsDead) FD_SETm(smpi_read, readset);/*** peer sockets */ /* construction of the read_set : listening all */ for(i = 0; i < sizeofmpipeer; i++) { if(mpipeer[i].fd < 0) /* no connection to peer i */ { if(mpipeer[i].cofd >= 0) { /* add connecting socket (cofd) */ if(mpipeer[i].costate == CO_COWAITING) FD_SETm(mpipeer[i].cofd, writeset); else FD_SETm(mpipeer[i].cofd, readset); continue; } continue; } if( ftp_msgrecv_possible(mpipeer, i) ) FD_SETm(mpipeer[i].fd, readset); } /*** server sockets */ if (sockEL != -1) { FD_SETm(sockEL , readset); if(ftp_getELfdsets()) FD_SET(sockEL, writeset); } if (sockCSCHED != -1) { FD_SETm(sockCSCHED , readset); if(ftp_getcschedfdset()) FD_SET(sockCSCHED, writeset); } /* Dispatcher */ if(disp_fd != -1) FD_SETm(disp_fd, readset); /* checkpoint communications */ if(pckpt_rfdset()) FD_SETm(cin->pipe, readset); /* protocol channel should never be a bottleneck as it should be of negligible size * in comparison to data channel. If data channel becomes available it * is reasonable to consider that protocol channel is but not the contrary. */ if(pckpt_wfdset()) { if(cin->sock.data != -1) FD_SETm(cin->sock.data, writeset); if(cin->sock.file != -1) FD_SETm(cin->sock.file, writeset); } /*** Connexion managment sockets */ /* Connexion to other computing peers managment */ /** @todo may change: if everything is connected, don't add * Aur閘ien>> This has not to be changed: TCP timeout could have occured for dispacher before * this daemon detected failure of the reconnecting peer. The connexion protocol * ensure that there is always a connexion as long as there are a mean for * dispatcher to detect faults (even if computing nodes are not able to detect faults). * not replacing non failed peers by newly relaunched one should lead to * global inconsistent state from dispatcher point of view. This should be avoided * as it means only that something goes wrong */ FD_SETm(scon, readset); /* processes that are connecting */ max = getconnectingfdset(readset, max);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -