⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connect.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
字号:
/** @file connect.c implements the connection methods to other peer */#include "config.h"#include "debug.h"#include "connect.h"#include "utils_socket.h"#include "daemoncom.h"#include <stdlib.h>#include <unistd.h>#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <netinet/tcp.h>#include <arpa/inet.h>#include <fcntl.h>/** implement the connection functions between daemons */static int *acceptlist; /**< List of waiting for accept socket */static int acceptlistsize = 0;static int WorldSize = -1; /**< size of the world */static int myrank = -1; /**< my position in the world */void on_connect(SockInfo mpipeer[], int rank){  socklen_t longueur = sizeof(int);  int ret;  int optval;  ASSERT(mpipeer[rank].costate == CO_COWAITING);  printi("Connect", "connect waked up rank %d", rank);  if(getsockopt(mpipeer[rank].cofd,SOL_SOCKET,SO_ERROR,&ret,&longueur) < 0) qerror("getsockopt");  switch(ret)  {    case 0 :      /* Remove the delay for small packets */      optval = 1;      if( setsockopt(mpipeer[rank].cofd, IPPROTO_TCP, TCP_NODELAY, (char *) &optval, sizeof(optval)) == -1 ) qerror("setsockopt ");      /* Set the size of the reception buffer */      optval = MAX_BUF_FOR_SOCKET;      if( setsockopt(mpipeer[rank].cofd, SOL_SOCKET, SO_RCVBUF, (char *) &optval, sizeof(optval)) == -1 ) qerror("setsockopt ");      /* and set the size of the send buffer */      optval = MAX_BUF_FOR_SOCKET;      if( setsockopt(mpipeer[rank].cofd, SOL_SOCKET, SO_SNDBUF, (char *) &optval, sizeof(optval)) == -1 ) qerror("setsockopt ");      printi("Connect", "Sending myrank=%d", myrank);      /** @todo htonl */      if(_usend(mpipeer[rank].cofd, &myrank, sizeof(int), 0) != sizeof(int)) qerror("write(%d)", rank);      mpipeer[rank].costate = CO_COINPROGRESS;      break;    case ECONNREFUSED :    case ETIMEDOUT :      printi("Connect", "Connect request, ignoring connect failure to %d error is %s (maybe other peer is down)", rank, strerror(ret));      mpipeer[rank].costate = CO_NOCONNECT;      close(mpipeer[rank].cofd);      mpipeer[rank].cofd = -1;      break;    case EINPROGRESS :      printi("Connect", "Connect to %d : operation in progess",rank);      break;    default :      errno = ret;      qerror("getsockopt on_connect to %d",rank);  }  return;}void on_connected(SockInfo mpipeer[], int rank){  char c;  int n;  printi("Connect", "connect in progress rank %d", rank);  ASSERT(mpipeer[rank].costate == CO_COINPROGRESS);  printi("Connect", "Reading A/R from rank %d", rank);  n = _urecv(mpipeer[rank].cofd, &c, sizeof(char), 0);  if(n != sizeof(char))    {      printi("Connect", "Connect request, other party %d has crashed", rank);      mpipeer[rank].cofd = -1;      return;    }  if(c == 'A')    {      printi("Connect", "received A : Connect request accepted by other party %d -co-> %d", myrank, rank);      ASSERT(mpipeer[rank].fd == -1);      mpipeer[rank].fd = mpipeer[rank].cofd;      mpipeer[rank].cofd = -1;      mpipeer[rank].costate = CO_CONNECT;      mpipeer[rank].hstate = ftp_connection_starting_state( 1 );      printi("Connect","state is now %d", mpipeer[rank].hstate);    }  else    {      printi("Connect", "receive R : Connect request discarded by other party %d", rank);      if(mpipeer[rank].fd == -1)        {          printi("Connect", "Will only accept connection from %d now", rank);          mpipeer[rank].costate = CO_NOCONNECT;        }      close(mpipeer[rank].cofd);      mpipeer[rank].cofd = -1;    }}int on_accept(int scon){  int s;  struct sockaddr_in adress;  socklen_t longueur;  int optval;  longueur = sizeof( struct sockaddr_in );  if((s = accept( scon, ((struct sockaddr *) &adress) , &longueur )) < 0)  {    printe("accepting new peer");    return -1;  }  /* Remove the delay for small packets */  optval = 1;  if( setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char *) &optval, sizeof(optval)) == -1 ) qerror("setsockopt ");  /* Set the size of the reception buffer */  optval = MAX_BUF_FOR_SOCKET;  if( setsockopt(s, SOL_SOCKET, SO_RCVBUF, (char *) &optval, sizeof(optval)) == -1 ) qerror("setsockopt ");  /* and set the size of the send buffer */  optval = MAX_BUF_FOR_SOCKET;  if( setsockopt(s, SOL_SOCKET, SO_SNDBUF, (char *) &optval, sizeof(optval)) == -1 ) qerror("setsockopt ");  if(s != -1) acceptlist[acceptlistsize++] = s;  return s;}int inline on_accepted(SockInfo mpipeer[], fd_set *readset){  int rank;  char c;  int i, s;  int res = 0;  for(i = 0; i < acceptlistsize; i++)  {    s = acceptlist[i];    if(FD_ISSET(s, readset))    {      res = 1;      printi("Accept", "reading accepted peer rank on accept fd %d", s);      acceptlist[i] = acceptlist[--acceptlistsize];      i--;      if(_urecv(s, &rank, sizeof(int), 0) != sizeof(int))      {        printe("Connecting peer crashed during rank retrieve on fd %d", s);        close(s);        return res;      }      printi("Accept", "peer %d is requesting connection", rank);      if(mpipeer[rank].fd != -1)      {        c = 'R';        printi("Accept", "connection from %d rejected because already connected, sending R", rank);        if(_usend(s, &c, sizeof(char), 0) != sizeof(char)) printe("broken socket while sending reject message to %d", rank);        if(close(s) < 0) printe("closing after reject send to %d", rank);/**< @bug send;close is dangerous */      }      else if((mpipeer[rank].cofd != -1) && (myrank > rank))      {        c = 'R';        printi("Accept", "connection from %d rejected because myrank > rank and I am connecting, sending R", rank);        if(_usend(s, &c, sizeof(char), 0) != sizeof(char)) printe("broken socket while sending reject message to %d", rank);        if(close(s) < 0) printe("closing after reject send to %d", rank);/**< @bug send;close is dangerous */      }      else      {        c = 'A';        printi("Accept", "connection from %d accepted on socket %d, sending A", rank, s);        if(_usend(s, &c, sizeof(char), 0) != sizeof(char))        {          printe("broken socket while sending accept message A to %d", rank);          close(s);          return res;        }        mpipeer[rank].fd = s;        close(mpipeer[rank].cofd);        mpipeer[rank].cofd = -1;        mpipeer[rank].hstate = ftp_connection_starting_state( 0 );        printi("Connect","state is now %d", mpipeer[rank].hstate);      }    }  }  return res;}SockInfo *initMpipeer(int np, int mr){  int i;  SockInfo *mpipeer;  WorldSize = np;  myrank    = mr;  mpipeer = malloc (np * sizeof (SockInfo));  acceptlist = malloc(np * sizeof(int));  for (i = 0; i < np; i++)  {    mpipeer[i].fd = -1;    mpipeer[i].cofd = mpipeer[i].fd = -1;    mpipeer[i].costate = CO_NOCONNECT;    mpipeer[i].hstate = CO_HNOTDONE;    mpipeer[i].addr.sin_family = 0;    acceptlist[i] = -1;    acceptlistsize = 0;  }  return mpipeer;}int inline getconnectingfdset(fd_set * readset, int max){  int i, sock;  int maxi = max;  for(i = 0; i < acceptlistsize; i++)  {    if((sock = acceptlist[i]) != -1)    {      FD_SET(sock, readset);      if(sock > maxi) maxi = sock;    }  }  return maxi;}void parse_config_array (struct sockaddr_in *nodeListArray, SockInfo * mpipeer ){  int rank;  struct sockaddr_in nip;  for (rank = 0; rank < WorldSize; rank++)    {      nip.sin_family = AF_INET;      nip.sin_addr = nodeListArray[rank].sin_addr;      nip.sin_port = nodeListArray[rank].sin_port;      memcpy (&mpipeer[rank].addr, &nip, sizeof (struct sockaddr_in));      printi ("init", "rank %d : %s:%u", rank, inet_ntoa (mpipeer[rank].addr.sin_addr),	      ntohs (mpipeer[rank].addr.sin_port));    }}void init_connect_to_one_MPI(SockInfo *mpipeer, int rank){  printi ("init", "Connect request to MPI peer %d (%s:%d)", rank,	  inet_ntoa (mpipeer[rank].addr.sin_addr),	  ntohs (mpipeer[rank].addr.sin_port));  if (mpipeer[rank].cofd != -1)    {      close (mpipeer[rank].cofd);    }  mpipeer[rank].cofd = _usocket ();  if (fcntl (mpipeer[rank].cofd, F_SETFL, O_NONBLOCK) < 0)    qerror ("connect to peer, fcntl O_NONBLOCK");  if (connect      (mpipeer[rank].cofd, (struct sockaddr *) &mpipeer[rank].addr,       sizeof (struct sockaddr_in)) < 0)    {      if (errno != EINPROGRESS)	qerror ("connect request to MPI peer %d (%s:%d)", rank,		inet_ntoa (mpipeer[rank].addr.sin_addr),		ntohs (mpipeer[rank].addr.sin_port));    }  mpipeer[rank].costate = CO_COWAITING;  printi ("init", "Connect request to MPI peer %d release", rank);}/** * launch all connect requests to all other MPI peers * @param mpipeer filled with addr and ports * @return mpipeer cofd  field */void init_connect_to_MPI (SockInfo *mpipeer, int mrank){  int rank;  for (rank = 0; rank < WorldSize; rank++)    {      if (rank == mrank) continue; /* don't connect to myself  */      if (rank != myrank)	init_connect_to_one_MPI (mpipeer, rank);      else	{	  printi ("init", "Ignoring connect to self %d", myrank);	  continue;	}    }}void on_mpipeer_disconnect (int rank, SockInfo * mpipeer){  printi ("disconnect", "rank %d is disconnected", rank);  if (mpipeer[rank].fd > 0)    {      close (mpipeer[rank].fd);      mpipeer[rank].fd = -1;    }  if (mpipeer[rank].cofd > 0)    {      close (mpipeer[rank].cofd);      mpipeer[rank].cofd = -1;    }  mpipeer[rank].costate = CO_NOCONNECT;  mpipeer[rank].hstate = CO_HNOTDONE;  mpipeer[rank].buf = NULL;}void close_all_daemon_sockets (SockInfo * mpipeer){  int i;  printi ("exit", "is called");  for (i = 0; i < WorldSize; i++)    {      if (mpipeer[i].fd >= 0)	close (mpipeer[i].fd);      if (mpipeer[i].cofd >= 0)	close (mpipeer[i].cofd);    }}int initListenSock(int listenPort){  int scon;  int retListen;  int i;  struct sockaddr_in adresseLocal;	/* address of the listen sock */#define NTRY 120#define DELAY  1  /* listen socket opening */  memset(&adresseLocal, 0, sizeof (struct sockaddr_in));  adresseLocal.sin_family = AF_INET;  adresseLocal.sin_port = htons (listenPort);  adresseLocal.sin_addr.s_addr = htonl (INADDR_ANY);  if ((scon = socket (AF_INET, SOCK_STREAM, 0)) < 0)    {      qerror ("socket opening");    }  i = !0;  if (setsockopt (scon, SOL_SOCKET, SO_REUSEADDR, &i, sizeof (int)) < 0)    {      qerror ("setting reuse of non listening binded addr");    }  for (i = 0; i < NTRY; i++)    {      /* bind of the listening socket */      if (bind(scon, ((struct sockaddr *) &adresseLocal),               sizeof (struct sockaddr_in)) < 0)	{	  printe ("bind on %s:%d (try %d/%d, sleeping 1sec)", inet_ntoa(adresseLocal.sin_addr),		  htons(adresseLocal.sin_port), i, NTRY);	  sleep (DELAY);	}      else	break;    }  /* listen() */  if ((retListen = listen (scon, 5)) < 0)    qerror ("listen");  return scon;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -