📄 connect.c
字号:
/** @file connect.c implements the connection methods to other peer */#include "config.h"#include "debug.h"#include "connect.h"#include "utils_socket.h"#include "daemoncom.h"#include <stdlib.h>#include <unistd.h>#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <netinet/tcp.h>#include <arpa/inet.h>#include <fcntl.h>/** implement the connection functions between daemons */static int *acceptlist; /**< List of waiting for accept socket */static int acceptlistsize = 0;static int WorldSize = -1; /**< size of the world */static int myrank = -1; /**< my position in the world */void on_connect(SockInfo mpipeer[], int rank){ socklen_t longueur = sizeof(int); int ret; int optval; ASSERT(mpipeer[rank].costate == CO_COWAITING); printi("Connect", "connect waked up rank %d", rank); if(getsockopt(mpipeer[rank].cofd,SOL_SOCKET,SO_ERROR,&ret,&longueur) < 0) qerror("getsockopt"); switch(ret) { case 0 : /* Remove the delay for small packets */ optval = 1; if( setsockopt(mpipeer[rank].cofd, IPPROTO_TCP, TCP_NODELAY, (char *) &optval, sizeof(optval)) == -1 ) qerror("setsockopt "); /* Set the size of the reception buffer */ optval = MAX_BUF_FOR_SOCKET; if( setsockopt(mpipeer[rank].cofd, SOL_SOCKET, SO_RCVBUF, (char *) &optval, sizeof(optval)) == -1 ) qerror("setsockopt "); /* and set the size of the send buffer */ optval = MAX_BUF_FOR_SOCKET; if( setsockopt(mpipeer[rank].cofd, SOL_SOCKET, SO_SNDBUF, (char *) &optval, sizeof(optval)) == -1 ) qerror("setsockopt "); printi("Connect", "Sending myrank=%d", myrank); /** @todo htonl */ if(_usend(mpipeer[rank].cofd, &myrank, sizeof(int), 0) != sizeof(int)) qerror("write(%d)", rank); mpipeer[rank].costate = CO_COINPROGRESS; break; case ECONNREFUSED : case ETIMEDOUT : printi("Connect", "Connect request, ignoring connect failure to %d error is %s (maybe other peer is down)", rank, strerror(ret)); mpipeer[rank].costate = CO_NOCONNECT; close(mpipeer[rank].cofd); mpipeer[rank].cofd = -1; break; case EINPROGRESS : printi("Connect", "Connect to %d : operation in progess",rank); break; default : errno = ret; qerror("getsockopt on_connect to %d",rank); } return;}void on_connected(SockInfo mpipeer[], int rank){ char c; int n; printi("Connect", "connect in progress rank %d", rank); ASSERT(mpipeer[rank].costate == CO_COINPROGRESS); printi("Connect", "Reading A/R from rank %d", rank); n = _urecv(mpipeer[rank].cofd, &c, sizeof(char), 0); if(n != sizeof(char)) { printi("Connect", "Connect request, other party %d has crashed", rank); mpipeer[rank].cofd = -1; return; } if(c == 'A') { printi("Connect", "received A : Connect request accepted by other party %d -co-> %d", myrank, rank); ASSERT(mpipeer[rank].fd == -1); mpipeer[rank].fd = mpipeer[rank].cofd; mpipeer[rank].cofd = -1; mpipeer[rank].costate = CO_CONNECT; mpipeer[rank].hstate = ftp_connection_starting_state( 1 ); printi("Connect","state is now %d", mpipeer[rank].hstate); } else { printi("Connect", "receive R : Connect request discarded by other party %d", rank); if(mpipeer[rank].fd == -1) { printi("Connect", "Will only accept connection from %d now", rank); mpipeer[rank].costate = CO_NOCONNECT; } close(mpipeer[rank].cofd); mpipeer[rank].cofd = -1; }}int on_accept(int scon){ int s; struct sockaddr_in adress; socklen_t longueur; int optval; longueur = sizeof( struct sockaddr_in ); if((s = accept( scon, ((struct sockaddr *) &adress) , &longueur )) < 0) { printe("accepting new peer"); return -1; } /* Remove the delay for small packets */ optval = 1; if( setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char *) &optval, sizeof(optval)) == -1 ) qerror("setsockopt "); /* Set the size of the reception buffer */ optval = MAX_BUF_FOR_SOCKET; if( setsockopt(s, SOL_SOCKET, SO_RCVBUF, (char *) &optval, sizeof(optval)) == -1 ) qerror("setsockopt "); /* and set the size of the send buffer */ optval = MAX_BUF_FOR_SOCKET; if( setsockopt(s, SOL_SOCKET, SO_SNDBUF, (char *) &optval, sizeof(optval)) == -1 ) qerror("setsockopt "); if(s != -1) acceptlist[acceptlistsize++] = s; return s;}int inline on_accepted(SockInfo mpipeer[], fd_set *readset){ int rank; char c; int i, s; int res = 0; for(i = 0; i < acceptlistsize; i++) { s = acceptlist[i]; if(FD_ISSET(s, readset)) { res = 1; printi("Accept", "reading accepted peer rank on accept fd %d", s); acceptlist[i] = acceptlist[--acceptlistsize]; i--; if(_urecv(s, &rank, sizeof(int), 0) != sizeof(int)) { printe("Connecting peer crashed during rank retrieve on fd %d", s); close(s); return res; } printi("Accept", "peer %d is requesting connection", rank); if(mpipeer[rank].fd != -1) { c = 'R'; printi("Accept", "connection from %d rejected because already connected, sending R", rank); if(_usend(s, &c, sizeof(char), 0) != sizeof(char)) printe("broken socket while sending reject message to %d", rank); if(close(s) < 0) printe("closing after reject send to %d", rank);/**< @bug send;close is dangerous */ } else if((mpipeer[rank].cofd != -1) && (myrank > rank)) { c = 'R'; printi("Accept", "connection from %d rejected because myrank > rank and I am connecting, sending R", rank); if(_usend(s, &c, sizeof(char), 0) != sizeof(char)) printe("broken socket while sending reject message to %d", rank); if(close(s) < 0) printe("closing after reject send to %d", rank);/**< @bug send;close is dangerous */ } else { c = 'A'; printi("Accept", "connection from %d accepted on socket %d, sending A", rank, s); if(_usend(s, &c, sizeof(char), 0) != sizeof(char)) { printe("broken socket while sending accept message A to %d", rank); close(s); return res; } mpipeer[rank].fd = s; close(mpipeer[rank].cofd); mpipeer[rank].cofd = -1; mpipeer[rank].hstate = ftp_connection_starting_state( 0 ); printi("Connect","state is now %d", mpipeer[rank].hstate); } } } return res;}SockInfo *initMpipeer(int np, int mr){ int i; SockInfo *mpipeer; WorldSize = np; myrank = mr; mpipeer = malloc (np * sizeof (SockInfo)); acceptlist = malloc(np * sizeof(int)); for (i = 0; i < np; i++) { mpipeer[i].fd = -1; mpipeer[i].cofd = mpipeer[i].fd = -1; mpipeer[i].costate = CO_NOCONNECT; mpipeer[i].hstate = CO_HNOTDONE; mpipeer[i].addr.sin_family = 0; acceptlist[i] = -1; acceptlistsize = 0; } return mpipeer;}int inline getconnectingfdset(fd_set * readset, int max){ int i, sock; int maxi = max; for(i = 0; i < acceptlistsize; i++) { if((sock = acceptlist[i]) != -1) { FD_SET(sock, readset); if(sock > maxi) maxi = sock; } } return maxi;}void parse_config_array (struct sockaddr_in *nodeListArray, SockInfo * mpipeer ){ int rank; struct sockaddr_in nip; for (rank = 0; rank < WorldSize; rank++) { nip.sin_family = AF_INET; nip.sin_addr = nodeListArray[rank].sin_addr; nip.sin_port = nodeListArray[rank].sin_port; memcpy (&mpipeer[rank].addr, &nip, sizeof (struct sockaddr_in)); printi ("init", "rank %d : %s:%u", rank, inet_ntoa (mpipeer[rank].addr.sin_addr), ntohs (mpipeer[rank].addr.sin_port)); }}void init_connect_to_one_MPI(SockInfo *mpipeer, int rank){ printi ("init", "Connect request to MPI peer %d (%s:%d)", rank, inet_ntoa (mpipeer[rank].addr.sin_addr), ntohs (mpipeer[rank].addr.sin_port)); if (mpipeer[rank].cofd != -1) { close (mpipeer[rank].cofd); } mpipeer[rank].cofd = _usocket (); if (fcntl (mpipeer[rank].cofd, F_SETFL, O_NONBLOCK) < 0) qerror ("connect to peer, fcntl O_NONBLOCK"); if (connect (mpipeer[rank].cofd, (struct sockaddr *) &mpipeer[rank].addr, sizeof (struct sockaddr_in)) < 0) { if (errno != EINPROGRESS) qerror ("connect request to MPI peer %d (%s:%d)", rank, inet_ntoa (mpipeer[rank].addr.sin_addr), ntohs (mpipeer[rank].addr.sin_port)); } mpipeer[rank].costate = CO_COWAITING; printi ("init", "Connect request to MPI peer %d release", rank);}/** * launch all connect requests to all other MPI peers * @param mpipeer filled with addr and ports * @return mpipeer cofd field */void init_connect_to_MPI (SockInfo *mpipeer, int mrank){ int rank; for (rank = 0; rank < WorldSize; rank++) { if (rank == mrank) continue; /* don't connect to myself */ if (rank != myrank) init_connect_to_one_MPI (mpipeer, rank); else { printi ("init", "Ignoring connect to self %d", myrank); continue; } }}void on_mpipeer_disconnect (int rank, SockInfo * mpipeer){ printi ("disconnect", "rank %d is disconnected", rank); if (mpipeer[rank].fd > 0) { close (mpipeer[rank].fd); mpipeer[rank].fd = -1; } if (mpipeer[rank].cofd > 0) { close (mpipeer[rank].cofd); mpipeer[rank].cofd = -1; } mpipeer[rank].costate = CO_NOCONNECT; mpipeer[rank].hstate = CO_HNOTDONE; mpipeer[rank].buf = NULL;}void close_all_daemon_sockets (SockInfo * mpipeer){ int i; printi ("exit", "is called"); for (i = 0; i < WorldSize; i++) { if (mpipeer[i].fd >= 0) close (mpipeer[i].fd); if (mpipeer[i].cofd >= 0) close (mpipeer[i].cofd); }}int initListenSock(int listenPort){ int scon; int retListen; int i; struct sockaddr_in adresseLocal; /* address of the listen sock */#define NTRY 120#define DELAY 1 /* listen socket opening */ memset(&adresseLocal, 0, sizeof (struct sockaddr_in)); adresseLocal.sin_family = AF_INET; adresseLocal.sin_port = htons (listenPort); adresseLocal.sin_addr.s_addr = htonl (INADDR_ANY); if ((scon = socket (AF_INET, SOCK_STREAM, 0)) < 0) { qerror ("socket opening"); } i = !0; if (setsockopt (scon, SOL_SOCKET, SO_REUSEADDR, &i, sizeof (int)) < 0) { qerror ("setting reuse of non listening binded addr"); } for (i = 0; i < NTRY; i++) { /* bind of the listening socket */ if (bind(scon, ((struct sockaddr *) &adresseLocal), sizeof (struct sockaddr_in)) < 0) { printe ("bind on %s:%d (try %d/%d, sleeping 1sec)", inet_ntoa(adresseLocal.sin_addr), htons(adresseLocal.sin_port), i, NTRY); sleep (DELAY); } else break; } /* listen() */ if ((retListen = listen (scon, 5)) < 0) qerror ("listen"); return scon;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -