📄 fmtcp.c
字号:
/*---------------------------------------------------------------------------*//* Portable TCP FM-like implementation. *//* Author(s): Kalyan Perumalla <http://www.cc.gatech.edu/~kalyan> 28July2001 *//* $Revision: 1.9 $ $Name: v26apr05 $ $Date: 2004/04/07 20:02:26 $ *//*---------------------------------------------------------------------------*/#include <stdio.h>#include <stdlib.h>#include <string.h>#include <ctype.h>#include "mycompat.h"/*---------------------------------------------------------------------------*/#if PLATFORM_WIN #include <winsock.h> #define DELFILE_CMD "del" #define FILESEPARATOR "\\" #define MAXHOSTNAMELEN 1000 #define MAXPATHLEN 1000 #define SSIZE_MAX 32768 #define NO_SOCKET INVALID_SOCKET #define SOCK_CMP(a,b) (0) struct iovec { char *iov_base; int iov_len; }; #define SOCKET_READ(s,b,l) recv(s,b,l,0) #define SOCKET_WRITE(s,b,l) send(s,b,l,0) #define SOCKET_CLOSE(s) closesocket(s) static int writen( SOCKET fd, char *buf, int n ); /*Forward declaration*/ static int SOCKET_WRITEV(SOCKET s,const struct iovec *iov, int iovcount) { int i=0, totwritten=0; for(i=0; i<iovcount; i++) { int nw = writen(s, iov[i].iov_base, iov[i].iov_len); if(nw<0) break; totwritten += nw; } return totwritten; } static void SOCKET_INIT(void) { WORD vreq = MAKEWORD(2,2); WSADATA wd; int rcode = WSAStartup(vreq,&wd); int ver_min_high = 1, ver_min_low = 1; MYASSERT( rcode == 0, ("%d",rcode) ); MYASSERT( HIBYTE(wd.wVersion)>ver_min_high || (HIBYTE(wd.wVersion)==ver_min_high && LOBYTE(wd.wVersion)>=ver_min_low), ("%d.%d",wd.wVersion,wd.wVersion) ); } static void SOCKET_CLEANUP(void) { int rcode = WSACleanup(); MYASSERT( rcode == 0, ("%d",rcode) ); }#else #include <limits.h> #include <sys/param.h> #include <sys/types.h> #include <sys/uio.h> #include <sys/time.h> #include <sys/socket.h> #include <unistd.h> #include <netinet/in.h> #include <netinet/tcp.h> #include <arpa/inet.h> #include <netdb.h> typedef int SOCKET; #define NO_SOCKET -1 #define DELFILE_CMD "rm" #define FILESEPARATOR "/" #define SOCK_CMP(a,b) ((a)-(b)) #define SOCKET_READ(s,b,l) read(s,b,l) #define SOCKET_WRITE(s,b,l) write(s,b,l) #define SOCKET_CLOSE(s) close(s) #define SOCKET_WRITEV(s,v,c) writev(s,v,c) #define SOCKET_INIT() (void)0 #define SOCKET_CLEANUP() (void)0#endif#include "fmtcp.h"/*---------------------------------------------------------------------------*/static int tcpfmdbg = 0;/*---------------------------------------------------------------------------*/typedef struct{ int src_id; /*Original sender's caller-specific ID*/ int dest_id; /*Original receiver's caller-specific ID*/ int src_pe; int dest_pe; int handler; int npieces; /*Including this header piece; hence always >=1*/ int piecelen[TCPMAXPIECES]; /*Byte length of each piece*/ int totbytes;/*#bytes in all pieces combined; hence always >= sizeof(hdr)*/} TCPMsgHeaderPiece;/*---------------------------------------------------------------------------*/typedef struct{ struct iovec pieces[TCPMAXPIECES]; TCPMsgHeaderPiece hdr;} TCPSendMsg;/*---------------------------------------------------------------------------*/typedef struct{ TCPMsgHeaderPiece hdr; int npieces_recd; int nbytes_recd;} TCPRecvMsg;/*---------------------------------------------------------------------------*/typedef struct{ SOCKET sockfd; char hostname[MAXHOSTNAMELEN+1]; struct sockaddr_in addr; int port;} TCPPeer;/*---------------------------------------------------------------------------*/typedef struct{ int nodeid; int numnodes; TCPPeer peer[TCPMAXPE]; TCPSendMsg send_msg; TCPRecvMsg recv_msg;} TCPState;/*---------------------------------------------------------------------------*/int TCP_nodeid;int TCP_numnodes;static TCPCallback *fmcb = 0;static TCPState tcps, *tcp = &tcps;static char port_fname[MAXPATHLEN];/*------------------------------------------------------------------------*/unsigned long dot_to_ulong( const char *s ){ unsigned long n; if( !s || !strlen(s) ) return 0UL; n = inet_addr( s ); if( n == (unsigned long)(-1) ) return 0UL; return ntohl(n);}static struct hostent *mygethostbyname( const char *hname ){ int i = 0, dotted = 1; for(i=0; i<strlen(hname); i++) if(!(isdigit(hname[i]) || hname[i]=='.')) { dotted = 0; break; } if( !dotted ) { return gethostbyname(hname); } else { static struct hostent hent; static char *addrlist[2]; static unsigned long addr; memset( &hent, 0, sizeof(hent) ); addr = dot_to_ulong(hname); addr = htonl(addr); addrlist[0] = (char *)&addr; addrlist[1] = 0; hent.h_addr_list = addrlist; hent.h_length = sizeof(addr);if(tcpfmdbg>=3){printf("Detected dotted-IP \"%s\"\n", hname); fflush(stdout);} return &hent; }}#define gethostbyname mygethostbyname/*------------------------------------------------------------------------*//*---------------------------------------------------------------------------*//* *//*---------------------------------------------------------------------------*/static void config( char *hostnames[] ){ int i = 0; char *estr = 0; estr = getenv("FMTCP_DEBUG"); tcpfmdbg = estr ? atoi(estr) : 1;if(tcpfmdbg>=1){printf("FMTCP_DEBUG=%d\n",tcpfmdbg);fflush(stdout);} for( i = 0; i < tcp->numnodes; i++ ) { strcpy( tcp->peer[i].hostname, hostnames[i] );if(tcpfmdbg>=2){printf("TCPFM:host[%d]=\"%s\"\n",i,hostnames[i]);} }}/*---------------------------------------------------------------------------*/static int readn( SOCKET fd, char *buf, int n ){ long nleft, nread; nleft = n; while( nleft > 0 ) { nread = SOCKET_READ( fd, buf, nleft ); if( nread < 0 ) { break; } else if( nread == 0 ) { break; } else { nleft -= nread; buf += nread; } } return (n - nleft);}/*---------------------------------------------------------------------------*/static int writen( SOCKET fd, char *buf, int n ){ long nleft, nwritten; nleft = n; while( nleft > 0 ) { nwritten = SOCKET_WRITE( fd, buf, nleft ); if( nwritten < 0 ) { break; } else if( nwritten == 0 ) { break; } else { nleft -= nwritten; buf += nwritten; } } return (n - nleft);}/*---------------------------------------------------------------------------*/static void make_connections( void ){ int i, j; TCPPeer *self = &tcp->peer[tcp->nodeid]; int master_portnum = -1; for( i = 0; i < TCPMAXPE; i++ ) { TCPPeer *peer = &tcp->peer[i]; peer->sockfd = NO_SOCKET; peer->port = -1; } { char *home_dir = getenv("HOME"); char *session_name = getenv("FMTCP_SESSIONNAME"); char *master_portstr = getenv("FMTCP_MASTERPORT"); if(!session_name) session_name = getenv("SESSIONNAME"); /*Back-compat*/ if(!home_dir) home_dir = "."; if(!session_name) session_name = "fmtcp-portfile"; if(!master_portstr) master_portstr = "-1"; sprintf( port_fname, "%s%s%s", home_dir, FILESEPARATOR, session_name ); master_portnum = atoi(master_portstr); } if( tcp->numnodes > 1 ) { SOCKET sock; int port = -1, port_1st = 23456, port_last = 1000000; int bound = 0, listened = 0; struct sockaddr_in *psin = &self->addr; struct hostent *hent = gethostbyname(self->hostname); MYASSERT( hent, ("hent(%s)",self->hostname); perror("hostent") ); sock = socket( AF_INET, SOCK_STREAM, 0 ); MYASSERT( sock != NO_SOCKET, ("socket %d",sock); perror("socket") ); if( tcp->nodeid == 0 && master_portnum > 0 ) { port_1st = port_last = master_portnum; } else if( getenv("FMTCP_FIRSTTRYPORT") ) { port_1st = atoi(getenv("FMTCP_FIRSTTRYPORT")); }if(tcpfmdbg>=2){printf("FMTCP %d : 1st port to try bind= %d\n",tcp->nodeid,port_1st);fflush(stdout);} for( port = port_1st, bound = 0; port <= port_last; port++) { memset(psin, 0, sizeof(*psin)); psin->sin_port=htons((u_short)port); psin->sin_family = AF_INET; psin->sin_addr.s_addr=htonl(INADDR_ANY); memcpy(&psin->sin_addr, hent->h_addr, hent->h_length); bound = (bind(sock, (struct sockaddr *)psin, sizeof(*psin) ) == 0); if( bound ) break; } MYASSERT( bound, ("bind failed FMTCPID %d",tcp->nodeid); perror("bind") );if(tcpfmdbg>=1){printf("Node %d %s bound to port %d\n", tcp->nodeid, self->hostname, port);fflush(stdout);} listened = listen(sock, 10); MYASSERT( listened >= 0, ("listen %d",listened); perror("listen") ); self->port = port; self->sockfd = sock; MYASSERT( self->port >= 0, ("%d", self->port) ); MYASSERT( self->sockfd != NO_SOCKET, ("%d", self->sockfd) ); } if( tcp->nodeid == 0 ) { /*Advertise master port#*/ master_portnum = self->port; if( !getenv("FMTCP_NOPORTFILE") ) {if(tcpfmdbg>=2){printf("Writing port# to file %s ...", port_fname);fflush(stdout);} /*Write port# to a file */ { FILE *fp = fopen( port_fname, "w" ); MYASSERT( fp, ("TCPFM: Can't write port# to %s\n", port_fname)); fprintf( fp, "%d\n", master_portnum ); fflush( fp ); fclose( fp ); fp = 0; }if(tcpfmdbg>=2){printf("Done.\n");fflush(stdout);} } } else { /*Obtain master port#*/ if( master_portnum <= 0 ) { while( master_portnum <= 0 ) { FILE *fp = 0;if(tcpfmdbg>=2){printf("Reading port# from file %s ...", port_fname);fflush(stdout);} fp = fopen( port_fname, "r" ); if( !fp || fscanf( fp, "%d", &master_portnum ) != 1 ) {if(tcpfmdbg>=2){printf("\nRetrying..."); fflush(stdout);} sleep( 2/*sec*/ ); } if(fp) fclose( fp ); }if(tcpfmdbg>=2){printf("Done.\nMaster port# is %d\n", master_portnum);} } MYASSERT( master_portnum > 0, ("FMTCP master port required") ); tcp->peer[0].port = master_portnum; } for( i = 0; i < tcp->nodeid; i++ ) { SOCKET sock; TCPPeer *peer = &tcp->peer[i]; int atry = 0, maxtries = 1000; int port = peer->port, connected = 0; const char *hname = peer->hostname; MYASSERT( port >= 0, ("Port[%d] = %d",i,port) ); for( atry = 0; atry < maxtries; atry++ ) { struct sockaddr_in *psin = &peer->addr; struct hostent *hent = gethostbyname(hname); MYASSERT( hent, ("hent[%d]%s",i,hname); perror("hostent") ); memset(psin, 0, sizeof(*psin)); memcpy(&psin->sin_addr, hent->h_addr, hent->h_length); psin->sin_family = AF_INET; psin->sin_port = htons((u_short)port); sock = socket(AF_INET, SOCK_STREAM, 0); MYASSERT( sock!=NO_SOCKET, ("socket[%d]%s",i,hname);perror("socket")); connected=(connect(sock,(struct sockaddr *)psin,sizeof(*psin))==0); if( connected ) break; perror("Retrying. connect()"); fflush(stderr); SOCKET_CLOSE(sock); sock = NO_SOCKET; sleep(1);if(tcpfmdbg>=2){printf("Try %d\n",atry);fflush(stdout);} } MYASSERT( connected, ("Connection to %s:%d",hname,port) );if(tcpfmdbg>=2){printf( "Node %d connected to node %d - %s:%d\n", tcp->nodeid, i, hname, port);fflush(stdout);} peer->sockfd = sock; { int nw = writen( sock, (char *)&tcp->nodeid, sizeof(tcp->nodeid) ); MYASSERT( nw == sizeof(tcp->nodeid), ("!") ); } if( i == 0 ) { int nwritten = -1; nwritten = writen( peer->sockfd, (char *)&self->port, sizeof(self->port) ); MYASSERT( nwritten == sizeof(self->port), ("!") ); for( j = 0; j < tcp->numnodes; j++ ) { int port = -1, nread = 0; nread = readn(peer->sockfd, (char *)&port, sizeof(port)); MYASSERT( nread == sizeof(port) && port>=0, ("%d %d",nread,port)); if( j == tcp->nodeid ) MYASSERT( port == self->port, ("!") ); else tcp->peer[j].port = port; } } } for( i = tcp->nodeid+1; i < tcp->numnodes; i++ ) { int k = -1; SOCKET sock; struct sockaddr_in clin; int clilen = sizeof(clin); sock = accept( self->sockfd, (struct sockaddr *)&clin, &clilen ); MYASSERT( sock != NO_SOCKET, ("accept[%d]",i); perror("accept") );if(tcpfmdbg>=2){printf("Node %d recd connection #%d\n", tcp->nodeid, i-tcp->nodeid-1);fflush(stdout);} { int nread = readn( sock, (char *)&k, sizeof(k) ); MYASSERT( nread == sizeof(k), ("!") ); }if(tcpfmdbg>=3){printf("Node %d recd ID %d\n", tcp->nodeid, k);fflush(stdout);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -