📄 fmtcp.c
字号:
MYASSERT( tcp->nodeid < k && k < tcp->numnodes, ("%d must lie in [%d..%d]", k, tcp->nodeid+1, tcp->numnodes-1)); { TCPPeer *peer = &tcp->peer[k]; MYASSERT( peer->sockfd == NO_SOCKET, ("%d already connected? sockfd=%d", k, peer->sockfd) ); peer->sockfd = sock; peer->addr = clin; } } if( tcp->numnodes > 1 && tcp->nodeid == 0 ) { for( i = 0; i < tcp->numnodes; i++ ) { if( i != tcp->nodeid ) { SOCKET fd = tcp->peer[i].sockfd; int port = -1, nread = -1; nread = readn( fd, (char *)&port, sizeof(port) ); MYASSERT( nread == sizeof(port), ("!") ); MYASSERT( port >= 0, ("%d",port) ); tcp->peer[i].port = port;if(tcpfmdbg>=3){printf("Node 0 recd port %d from node %d\n", port, i);fflush(stdout);} } } for( i = 0; i < tcp->numnodes; i++ ) { if( i != tcp->nodeid ) { SOCKET fd = tcp->peer[i].sockfd; for( j = 0; j < tcp->numnodes; j++ ) { int port = tcp->peer[j].port, nwritten = -1; MYASSERT( port >= 0, ("!") ); nwritten = writen( fd, (char *)&port, sizeof(port) ); MYASSERT( nwritten==sizeof(port), ("%d %d",nwritten,sizeof(port)) ); } } } } SOCKET_CLOSE( self->sockfd ); self->sockfd = NO_SOCKET; for( i = 0; i < tcp->numnodes; i++ ) { if( i != tcp->nodeid ) { TCPPeer *peer = &tcp->peer[i]; SOCKET fd = peer->sockfd; int f = 1, retval = -1; int rsz = 16000000; /*CUSTOMIZE*/ int ssz = 16000000; /*CUSTOMIZE*/ int defrsz = 0, newrsz = rsz, defssz = 0, newssz = ssz; int optlen=sizeof(int); struct linger lr; retval = getsockopt( fd, SOL_SOCKET, SO_RCVBUF, (char *)&defrsz, &optlen ); MYASSERT( retval >= 0, ("!"); perror("getsockopt") ); newrsz = defrsz;if(tcpfmdbg>=4){printf("Node %d: Default SO_RCVBUF %d bytes\n",tcp->nodeid,defrsz);} for( ; rsz>0 && rsz>defrsz; rsz /= 1.5 ) {if(tcpfmdbg>=4){printf("Node %d: Increasing SO_RCVBUF to %d bytes\n",tcp->nodeid,rsz);} retval = setsockopt( fd, SOL_SOCKET, SO_RCVBUF, (const char *)&rsz, sizeof(rsz) ); retval = getsockopt( fd, SOL_SOCKET, SO_RCVBUF, (char *)&newrsz, &optlen ); MYASSERT( retval >= 0, ("!"); perror("getsockopt") ); if( newrsz >= rsz ) break;if(tcpfmdbg>=4){printf("Node %d: Couldn't increase SOCKET RCVBUF from %d to %d. Now set to %d.\n", tcp->nodeid, defrsz, rsz, newrsz);} } MYASSERT( rsz > 0, ("%d", rsz) );if(tcpfmdbg>=4){printf("Node %d: Using SO_RCVBUF %d bytes\n",tcp->nodeid,newrsz);} retval = getsockopt( fd, SOL_SOCKET, SO_SNDBUF, (char *)&defssz, &optlen ); MYASSERT( retval >= 0, ("!"); perror("getsockopt") ); newssz = defssz;if(tcpfmdbg>=4){printf("Node %d: Default SO_SNDBUF %d bytes\n",tcp->nodeid,defssz);} for( ; ssz>0 && ssz>defssz; ssz /= 1.5 ) { retval = setsockopt( fd, SOL_SOCKET, SO_SNDBUF, (const char *)&ssz, sizeof(ssz) ); retval = getsockopt( fd, SOL_SOCKET, SO_SNDBUF, (char *)&newssz, &optlen ); MYASSERT( retval >= 0, ("!"); perror("getsockopt") ); if( newssz >= ssz ) break;if(tcpfmdbg>=4){printf("Node %d: Couldn't increase SOCKET SNDBUF from %d to %d. Now set to %d.\n", tcp->nodeid, defssz, ssz, newssz);} } MYASSERT( ssz > 0, ("%d", ssz) );if(tcpfmdbg>=4){printf("Node %d: Using SO_SNDBUF %d bytes\n",tcp->nodeid,newssz);} lr.l_onoff = 1; lr.l_linger = 5; retval = setsockopt( fd, SOL_SOCKET, SO_LINGER, (const char *)&lr, sizeof(lr) ); MYASSERT( retval >= 0, ("!"); perror("setsockopt") ); retval = setsockopt( fd, IPPROTO_TCP, TCP_NODELAY, (const char *)&f, sizeof(f) ); MYASSERT( retval >= 0, ("!"); perror("setsockopt") ); } }}/*---------------------------------------------------------------------------*//* *//*---------------------------------------------------------------------------*/void TCP_initialize( char *hostnames[], int nodeid, int numnodes, TCPCallback *cb ){if(tcpfmdbg>=1){printf("TCP_initialize() started.\n");fflush(stdout);} MYASSERT( cb, ("Message callback required") ); fmcb = cb; SOCKET_INIT(); TCP_nodeid = tcp->nodeid = nodeid; TCP_numnodes = tcp->numnodes = numnodes;if(tcpfmdbg>=1){printf( "TCP_nodeid=%d, TCP_numnodes=%d\n", TCP_nodeid, TCP_numnodes);fflush(stdout);} MYASSERT( 0 < tcp->numnodes && tcp->numnodes <= TCPMAXPE, ("#nodes %d must be in [1..%d]", tcp->numnodes, TCPMAXPE) ); MYASSERT( 0 <= tcp->nodeid && tcp->nodeid < tcp->numnodes, ("Node %d must be in [0..%d]", tcp->nodeid, tcp->numnodes-1) ); config( hostnames ); if( tcp->numnodes > 1 ) { make_connections(); }if(tcpfmdbg>=1){printf("TCP_initialize() done.\n");fflush(stdout);}}/*---------------------------------------------------------------------------*/void TCP_finalize( void ){ int i = 0; for( i = 0; i < tcp->numnodes; i++ ) { SOCKET *pfd = &tcp->peer[i].sockfd; if( *pfd != NO_SOCKET ) { SOCKET_CLOSE( *pfd ); } *pfd = NO_SOCKET; } SOCKET_CLEANUP();}/*---------------------------------------------------------------------------*/TCP_stream *TCP_begin_message( int recipient, int length, int handler, int src_id, int dest_id ){ TCPSendMsg *msg = &tcp->send_msg; TCPMsgHeaderPiece *hdr = &msg->hdr; struct iovec *iov = &msg->pieces[0]; MYASSERT( 0 <= recipient && recipient < tcp->numnodes, ("Bad TCP recipient ID %d must be in [0..%d]", recipient, tcp->numnodes-1) ); MYASSERT( 0 <= length && length <= SSIZE_MAX, ("Msg size %d must be <= %d", length, SSIZE_MAX)); hdr->src_id = src_id; hdr->dest_id = dest_id; hdr->src_pe = tcp->nodeid; hdr->dest_pe = recipient; hdr->handler = handler; hdr->npieces = 1; hdr->piecelen[0] = sizeof(*hdr); hdr->totbytes = sizeof(*hdr); iov->iov_base = (char *)hdr; iov->iov_len = hdr->totbytes; return (TCP_stream *)msg;}/*---------------------------------------------------------------------------*/void TCP_send_piece( TCP_stream *sendstream, void *buffer, int length ){ TCPSendMsg *msg = &tcp->send_msg; TCPMsgHeaderPiece *hdr = &msg->hdr; MYASSERT( sendstream == (TCP_stream *)msg, ("!") ); MYASSERT( buffer && 0 <= length && length <= (SSIZE_MAX-hdr->totbytes), ("buffer=%p, length = %d SSIZE_MAX=%d", buffer, length, SSIZE_MAX)); MYASSERT( hdr->npieces < TCPMAXPIECES, ("TCP msg pieces can't exceed compiled %d pieces", TCPMAXPIECES) ); MYASSERT( length <= TCPMAXPIECELEN, ("Piece len %d can't exceed compiled %d", length, TCPMAXPIECELEN)); { int pn = hdr->npieces++; struct iovec *iov = &msg->pieces[pn]; iov->iov_base = buffer; iov->iov_len = length; hdr->piecelen[pn] = length; hdr->totbytes += length; }}/*---------------------------------------------------------------------------*/void TCP_end_message( TCP_stream *sendstream ){ TCPSendMsg *msg = &tcp->send_msg; TCPMsgHeaderPiece *hdr = &msg->hdr; int to = hdr->dest_pe, nwritten = 0; SOCKET sockfd = tcp->peer[to].sockfd; MYASSERT( sendstream == (TCP_stream *)msg, ("!") ); if( sockfd != NO_SOCKET ) { nwritten = SOCKET_WRITEV( sockfd, msg->pieces, hdr->npieces ); MYASSERT( nwritten == hdr->totbytes, ("nwritten %d must equal totbytes %d",nwritten,hdr->totbytes) ); }}/*---------------------------------------------------------------------------*/int TCP_receive( void *buffer, TCP_stream *receivestream, unsigned int length ){ TCPRecvMsg *msg = &tcp->recv_msg; TCPMsgHeaderPiece *hdr = &msg->hdr; SOCKET *pfd = &tcp->peer[hdr->src_pe].sockfd; int nread = 0, pn = 0; MYASSERT( receivestream == msg, ("!") ); pn = msg->npieces_recd++; if( pn > 0 ) { MYASSERT( pn < hdr->npieces, ("Only #%d pieces", hdr->npieces) ); MYASSERT( length == hdr->piecelen[pn], ("request len %d != sent len %d", length, hdr->piecelen[pn])); MYASSERT( length <= hdr->totbytes - msg->nbytes_recd, ("%d, %d, %d", length, hdr->totbytes, msg->nbytes_recd) ); } if( *pfd != NO_SOCKET ) { nread = readn( *pfd, buffer, length ); } if( nread <= 0 ) { SOCKET_CLOSE( *pfd ); *pfd = NO_SOCKET; } msg->nbytes_recd += length; return nread;}/*---------------------------------------------------------------------------*/static int recv_one_msg( int pe ){ int i = 0, nbytes = 0, nrecd = 0; SOCKET fd = tcp->peer[pe].sockfd; TCPRecvMsg *msg = &tcp->recv_msg; TCPMsgHeaderPiece temphdr, *hdr = &msg->hdr; MYASSERT( fd != NO_SOCKET, ("Ensure valid fd[%d]=%d", pe, fd) ); msg->nbytes_recd = 0; msg->npieces_recd = 0; hdr->src_id = -1; hdr->dest_id = -1; hdr->src_pe = pe; hdr->dest_pe = tcp->nodeid; hdr->handler = 0; hdr->npieces = 0; hdr->totbytes = 0; nrecd = TCP_receive( &temphdr, msg, sizeof(temphdr) ); if( nrecd <= 0 ) { /*Do nothing*/ } else { MYASSERT( temphdr.src_pe == hdr->src_pe, ("src pes must agree: %d != %d", temphdr.src_pe, hdr->src_pe) ); MYASSERT( temphdr.dest_pe == hdr->dest_pe, ("dest pes must agree: %d != %d", temphdr.dest_pe, hdr->dest_pe) ); MYASSERT( temphdr.totbytes >= sizeof(*hdr), ("msg size: %d >= %d", temphdr.totbytes, sizeof(*hdr)) ); MYASSERT( temphdr.piecelen[0] == sizeof(*hdr), ("piecelen[0] %d != hdr size %d",temphdr.piecelen[0],sizeof(*hdr))); *hdr = temphdr; fmcb( hdr->handler, msg, hdr->src_pe, hdr->src_id, hdr->dest_id ); nrecd = 0; for( i = msg->npieces_recd; i < hdr->npieces; i++ ) { char buf[TCPMAXPIECELEN]; int len = hdr->piecelen[i]; MYASSERT( len <= sizeof(buf), ("%d %d",len,sizeof(buf)) ); nrecd = TCP_receive( buf, msg, len ); if( nrecd <= 0 ) break; } MYASSERT( nrecd <= 0 || msg->nbytes_recd == hdr->totbytes, ("%d, %d", msg->nbytes_recd, hdr->totbytes) ); nbytes = msg->nbytes_recd; } return nbytes;}/*---------------------------------------------------------------------------*/static int poll_all_once( unsigned int maxbytes ){ struct timeval tv; int i = 0, maxfd = 0, nready = 0, nbytes = 0; fd_set rfds; FD_ZERO( &rfds ); for( i = 0; i < tcp->numnodes; i++ ) { if( i != tcp->nodeid ) { SOCKET fd = tcp->peer[i].sockfd; if( fd != NO_SOCKET ) { FD_SET( fd, &rfds ); if( SOCK_CMP(fd, maxfd) > 0 ) maxfd = fd; } } } tv.tv_sec = tv.tv_usec = 0; nready = select( maxfd+1, &rfds, 0, 0, &tv ); if( nready < 0 ) {if(0){MYASSERT( 0, ("select() failed"); perror("") );} } else if( nready == 0 ) { } else { for( i = 0; i < tcp->numnodes-1 && nbytes < maxbytes; i++ ) { static int pe = 0; if( pe == tcp->nodeid ) pe++; pe %= tcp->numnodes; { SOCKET fd = tcp->peer[pe].sockfd; if( fd != NO_SOCKET && FD_ISSET( fd, &rfds ) ) { nbytes += recv_one_msg( pe ); } } pe++; pe %= tcp->numnodes; } } return nbytes;}/*---------------------------------------------------------------------------*/int TCP_extract( unsigned int maxbytes ){ int nbytes = 0; static int removed_portfile = 0; if( TCP_nodeid == 0 && !removed_portfile ) { char cmd[MAXPATHLEN+100]; sprintf(cmd,"%s %s", DELFILE_CMD, port_fname); system(cmd); removed_portfile = 1; }if( tcp->numnodes <= 1 ) return 0; while( nbytes < maxbytes ) { int m = poll_all_once( maxbytes-nbytes ); if( m <= 0 ) break; nbytes += m; } return nbytes;}/*---------------------------------------------------------------------------*/int TCP_numpieces( TCP_stream *tcp_stream ){ TCPRecvMsg *msg = &tcp->recv_msg; TCPMsgHeaderPiece *hdr = &msg->hdr; MYASSERT( tcp_stream == msg, ("!") ); return hdr->npieces-1;}/*---------------------------------------------------------------------------*/int TCP_piecelen( TCP_stream *tcp_stream, int i ){ TCPRecvMsg *msg = &tcp->recv_msg; TCPMsgHeaderPiece *hdr = &msg->hdr; MYASSERT( tcp_stream == msg, ("!") ); MYASSERT( 0<=i && i < hdr->npieces-1, ("Only #%d pieces", hdr->npieces-1) ); return hdr->piecelen[i+1];}/*---------------------------------------------------------------------------*/int TCP_debug_level( int level ){ int old = tcpfmdbg; tcpfmdbg = level; return old;}/*---------------------------------------------------------------------------*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -