⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fmtcp.c

📁 基于linux环境的ns2多机并行仿真补丁
💻 C
📖 第 1 页 / 共 2 页
字号:
	MYASSERT( tcp->nodeid < k && k < tcp->numnodes,	        ("%d must lie in [%d..%d]", k, tcp->nodeid+1, tcp->numnodes-1));	{	    TCPPeer *peer = &tcp->peer[k];	    MYASSERT( peer->sockfd == NO_SOCKET,	            ("%d already connected? sockfd=%d", k, peer->sockfd) );	    peer->sockfd = sock;	    peer->addr = clin;	}    }    if( tcp->numnodes > 1 && tcp->nodeid == 0 )    {        for( i = 0; i < tcp->numnodes; i++ )	{	    if( i != tcp->nodeid )	    {	        SOCKET fd = tcp->peer[i].sockfd;	        int port = -1, nread = -1;		nread = readn( fd, (char *)&port, sizeof(port) );		MYASSERT( nread == sizeof(port), ("!") );		MYASSERT( port >= 0, ("%d",port) );		tcp->peer[i].port = port;if(tcpfmdbg>=3){printf("Node 0 recd port %d from node %d\n", port, i);fflush(stdout);}	    }	}        for( i = 0; i < tcp->numnodes; i++ )	{	    if( i != tcp->nodeid )	    {	        SOCKET fd = tcp->peer[i].sockfd;	        for( j = 0; j < tcp->numnodes; j++ )	        {		    int port = tcp->peer[j].port, nwritten = -1;		    MYASSERT( port >= 0, ("!") );		    nwritten = writen( fd, (char *)&port, sizeof(port) );		    MYASSERT( nwritten==sizeof(port),		            ("%d %d",nwritten,sizeof(port)) );	        }	    }	}    }    SOCKET_CLOSE( self->sockfd ); self->sockfd = NO_SOCKET;    for( i = 0; i < tcp->numnodes; i++ )    {        if( i != tcp->nodeid )	{	    TCPPeer *peer = &tcp->peer[i];	    SOCKET fd = peer->sockfd;	    int f = 1, retval = -1;	    int rsz = 16000000; /*CUSTOMIZE*/	    int ssz = 16000000; /*CUSTOMIZE*/	    int defrsz = 0, newrsz = rsz, defssz = 0, newssz = ssz;	    int optlen=sizeof(int);	    struct linger lr;	    retval = getsockopt( fd, SOL_SOCKET, SO_RCVBUF,	                         (char *)&defrsz, &optlen );	    MYASSERT( retval >= 0, ("!"); perror("getsockopt") );	    newrsz = defrsz;if(tcpfmdbg>=4){printf("Node %d: Default SO_RCVBUF %d bytes\n",tcp->nodeid,defrsz);}	    for( ; rsz>0 && rsz>defrsz; rsz /= 1.5 )	    {if(tcpfmdbg>=4){printf("Node %d: Increasing SO_RCVBUF to %d bytes\n",tcp->nodeid,rsz);}	        retval = setsockopt( fd, SOL_SOCKET, SO_RCVBUF,	                             (const char *)&rsz, sizeof(rsz) );	        retval = getsockopt( fd, SOL_SOCKET, SO_RCVBUF,	                             (char *)&newrsz, &optlen );	        MYASSERT( retval >= 0, ("!"); perror("getsockopt") );	        if( newrsz >= rsz ) break;if(tcpfmdbg>=4){printf("Node %d: Couldn't increase SOCKET RCVBUF from %d to %d. Now set to %d.\n", tcp->nodeid, defrsz, rsz, newrsz);}	    }            MYASSERT( rsz > 0, ("%d", rsz) );if(tcpfmdbg>=4){printf("Node %d: Using SO_RCVBUF %d bytes\n",tcp->nodeid,newrsz);}	    retval = getsockopt( fd, SOL_SOCKET, SO_SNDBUF,	                         (char *)&defssz, &optlen );	    MYASSERT( retval >= 0, ("!"); perror("getsockopt") );	    newssz = defssz;if(tcpfmdbg>=4){printf("Node %d: Default SO_SNDBUF %d bytes\n",tcp->nodeid,defssz);}            for( ; ssz>0 && ssz>defssz; ssz /= 1.5 )            {	        retval = setsockopt( fd, SOL_SOCKET, SO_SNDBUF,	                             (const char *)&ssz, sizeof(ssz) );	        retval = getsockopt( fd, SOL_SOCKET, SO_SNDBUF,	                             (char *)&newssz, &optlen );	        MYASSERT( retval >= 0, ("!"); perror("getsockopt") );	        if( newssz >= ssz ) break;if(tcpfmdbg>=4){printf("Node %d: Couldn't increase SOCKET SNDBUF from %d to %d. Now set to %d.\n", tcp->nodeid, defssz, ssz, newssz);}            }            MYASSERT( ssz > 0, ("%d", ssz) );if(tcpfmdbg>=4){printf("Node %d: Using SO_SNDBUF %d bytes\n",tcp->nodeid,newssz);}	    lr.l_onoff = 1; lr.l_linger = 5;	    retval = setsockopt( fd, SOL_SOCKET, SO_LINGER,	                         (const char *)&lr, sizeof(lr) );	    MYASSERT( retval >= 0, ("!"); perror("setsockopt") );	    retval = setsockopt( fd, IPPROTO_TCP, TCP_NODELAY,	                         (const char *)&f, sizeof(f) );	    MYASSERT( retval >= 0, ("!"); perror("setsockopt") );	}    }}/*---------------------------------------------------------------------------*//*                                                                           *//*---------------------------------------------------------------------------*/void TCP_initialize( char *hostnames[], int nodeid, int numnodes,    TCPCallback *cb ){if(tcpfmdbg>=1){printf("TCP_initialize() started.\n");fflush(stdout);}    MYASSERT( cb, ("Message callback required") );    fmcb = cb;    SOCKET_INIT();    TCP_nodeid = tcp->nodeid = nodeid;    TCP_numnodes = tcp->numnodes = numnodes;if(tcpfmdbg>=1){printf( "TCP_nodeid=%d, TCP_numnodes=%d\n", TCP_nodeid, TCP_numnodes);fflush(stdout);}    MYASSERT( 0 < tcp->numnodes && tcp->numnodes <= TCPMAXPE,            ("#nodes %d must be in [1..%d]", tcp->numnodes, TCPMAXPE) );    MYASSERT( 0 <= tcp->nodeid && tcp->nodeid < tcp->numnodes,            ("Node %d must be in [0..%d]", tcp->nodeid, tcp->numnodes-1) );    config( hostnames );    if( tcp->numnodes > 1 )    {        make_connections();    }if(tcpfmdbg>=1){printf("TCP_initialize() done.\n");fflush(stdout);}}/*---------------------------------------------------------------------------*/void TCP_finalize( void ){    int i = 0;    for( i = 0; i < tcp->numnodes; i++ )    {        SOCKET *pfd = &tcp->peer[i].sockfd;	if( *pfd != NO_SOCKET )	{	    SOCKET_CLOSE( *pfd );	}	*pfd = NO_SOCKET;    }    SOCKET_CLEANUP();}/*---------------------------------------------------------------------------*/TCP_stream *TCP_begin_message( int recipient, int length, int handler,    int src_id, int dest_id ){    TCPSendMsg *msg = &tcp->send_msg;    TCPMsgHeaderPiece *hdr = &msg->hdr;    struct iovec *iov = &msg->pieces[0];    MYASSERT( 0 <= recipient && recipient < tcp->numnodes,            ("Bad TCP recipient ID %d must be in [0..%d]",	     recipient, tcp->numnodes-1) );    MYASSERT( 0 <= length && length <= SSIZE_MAX,            ("Msg size %d must be <= %d", length, SSIZE_MAX));    hdr->src_id = src_id;    hdr->dest_id = dest_id;    hdr->src_pe = tcp->nodeid;    hdr->dest_pe = recipient;    hdr->handler = handler;    hdr->npieces = 1;    hdr->piecelen[0] = sizeof(*hdr);    hdr->totbytes = sizeof(*hdr);    iov->iov_base = (char *)hdr;    iov->iov_len = hdr->totbytes;    return (TCP_stream *)msg;}/*---------------------------------------------------------------------------*/void TCP_send_piece( TCP_stream *sendstream, void *buffer, int length ){    TCPSendMsg *msg = &tcp->send_msg;    TCPMsgHeaderPiece *hdr = &msg->hdr;    MYASSERT( sendstream == (TCP_stream *)msg, ("!") );    MYASSERT( buffer && 0 <= length && length <= (SSIZE_MAX-hdr->totbytes),            ("buffer=%p, length = %d SSIZE_MAX=%d", buffer, length, SSIZE_MAX));    MYASSERT( hdr->npieces < TCPMAXPIECES,            ("TCP msg pieces can't exceed compiled %d pieces", TCPMAXPIECES) );    MYASSERT( length <= TCPMAXPIECELEN,            ("Piece len %d can't exceed compiled %d", length, TCPMAXPIECELEN));    {    int pn = hdr->npieces++;    struct iovec *iov = &msg->pieces[pn];    iov->iov_base = buffer;    iov->iov_len = length;    hdr->piecelen[pn] = length;    hdr->totbytes += length;    }}/*---------------------------------------------------------------------------*/void TCP_end_message( TCP_stream *sendstream ){    TCPSendMsg *msg = &tcp->send_msg;    TCPMsgHeaderPiece *hdr = &msg->hdr;    int to = hdr->dest_pe, nwritten = 0;    SOCKET sockfd = tcp->peer[to].sockfd;    MYASSERT( sendstream == (TCP_stream *)msg, ("!") );    if( sockfd != NO_SOCKET )    {        nwritten = SOCKET_WRITEV( sockfd, msg->pieces, hdr->npieces );        MYASSERT( nwritten == hdr->totbytes,                ("nwritten %d must equal totbytes %d",nwritten,hdr->totbytes) );    }}/*---------------------------------------------------------------------------*/int TCP_receive( void *buffer, TCP_stream *receivestream, unsigned int length ){    TCPRecvMsg *msg = &tcp->recv_msg;    TCPMsgHeaderPiece *hdr = &msg->hdr;    SOCKET *pfd = &tcp->peer[hdr->src_pe].sockfd;    int nread = 0, pn = 0;    MYASSERT( receivestream == msg, ("!") );    pn = msg->npieces_recd++;    if( pn > 0 )    {        MYASSERT( pn < hdr->npieces,	        ("Only #%d pieces", hdr->npieces) );        MYASSERT( length == hdr->piecelen[pn],	        ("request len %d != sent len %d", length, hdr->piecelen[pn]));        MYASSERT( length <= hdr->totbytes - msg->nbytes_recd,                ("%d, %d, %d", length, hdr->totbytes, msg->nbytes_recd) );    }    if( *pfd != NO_SOCKET )    {        nread = readn( *pfd, buffer, length );    }    if( nread <= 0 )    {        SOCKET_CLOSE( *pfd );	*pfd = NO_SOCKET;    }    msg->nbytes_recd += length;    return nread;}/*---------------------------------------------------------------------------*/static int recv_one_msg( int pe ){    int i = 0, nbytes = 0, nrecd = 0;    SOCKET fd = tcp->peer[pe].sockfd;    TCPRecvMsg *msg = &tcp->recv_msg;    TCPMsgHeaderPiece temphdr, *hdr = &msg->hdr;    MYASSERT( fd != NO_SOCKET, ("Ensure valid fd[%d]=%d", pe, fd) );    msg->nbytes_recd = 0; msg->npieces_recd = 0;    hdr->src_id = -1; hdr->dest_id = -1;    hdr->src_pe = pe; hdr->dest_pe = tcp->nodeid;    hdr->handler = 0; hdr->npieces = 0; hdr->totbytes = 0;    nrecd = TCP_receive( &temphdr, msg, sizeof(temphdr) );    if( nrecd <= 0 )    {	/*Do nothing*/    }    else    {    MYASSERT( temphdr.src_pe == hdr->src_pe,            ("src pes must agree: %d != %d", temphdr.src_pe, hdr->src_pe) );    MYASSERT( temphdr.dest_pe == hdr->dest_pe,            ("dest pes must agree: %d != %d", temphdr.dest_pe, hdr->dest_pe) );    MYASSERT( temphdr.totbytes >= sizeof(*hdr),            ("msg size: %d >= %d", temphdr.totbytes, sizeof(*hdr)) );    MYASSERT( temphdr.piecelen[0] == sizeof(*hdr),            ("piecelen[0] %d != hdr size %d",temphdr.piecelen[0],sizeof(*hdr)));    *hdr = temphdr;    fmcb( hdr->handler, msg, hdr->src_pe, hdr->src_id, hdr->dest_id );    nrecd = 0;    for( i = msg->npieces_recd; i < hdr->npieces; i++ )    {	char buf[TCPMAXPIECELEN];	int len = hdr->piecelen[i];	MYASSERT( len <= sizeof(buf), ("%d %d",len,sizeof(buf)) );        nrecd = TCP_receive( buf, msg, len );	if( nrecd <= 0 ) break;    }    MYASSERT( nrecd <= 0 || msg->nbytes_recd == hdr->totbytes,            ("%d, %d", msg->nbytes_recd, hdr->totbytes) );    nbytes = msg->nbytes_recd;    }    return nbytes;}/*---------------------------------------------------------------------------*/static int poll_all_once( unsigned int maxbytes ){    struct timeval tv;    int i = 0, maxfd = 0, nready = 0, nbytes = 0;    fd_set rfds;    FD_ZERO( &rfds );    for( i = 0; i < tcp->numnodes; i++ )    {        if( i != tcp->nodeid )	{	    SOCKET fd = tcp->peer[i].sockfd;	    if( fd != NO_SOCKET )	    {	        FD_SET( fd, &rfds );		if( SOCK_CMP(fd, maxfd) > 0 ) maxfd = fd;	    }	}    }    tv.tv_sec = tv.tv_usec = 0;    nready = select( maxfd+1, &rfds, 0, 0, &tv );    if( nready < 0 )    {if(0){MYASSERT( 0, ("select() failed"); perror("") );}    }    else if( nready == 0 )    {    }    else    {        for( i = 0; i < tcp->numnodes-1 && nbytes < maxbytes; i++ )        {            static int pe = 0;            if( pe == tcp->nodeid ) pe++;	    pe %= tcp->numnodes;	    {	        SOCKET fd = tcp->peer[pe].sockfd;	        if( fd != NO_SOCKET && FD_ISSET( fd, &rfds ) )		{		    nbytes += recv_one_msg( pe );		}	    }	    pe++; pe %= tcp->numnodes;        }    }    return nbytes;}/*---------------------------------------------------------------------------*/int TCP_extract( unsigned int maxbytes ){    int nbytes = 0;    static int removed_portfile = 0;    if( TCP_nodeid == 0 && !removed_portfile )    {	char cmd[MAXPATHLEN+100];	sprintf(cmd,"%s %s", DELFILE_CMD, port_fname);	system(cmd);        removed_portfile = 1;    }if( tcp->numnodes <= 1 ) return 0;    while( nbytes < maxbytes )    {	int m = poll_all_once( maxbytes-nbytes );	if( m <= 0 ) break;        nbytes += m;    }    return nbytes;}/*---------------------------------------------------------------------------*/int TCP_numpieces( TCP_stream *tcp_stream ){    TCPRecvMsg *msg = &tcp->recv_msg;    TCPMsgHeaderPiece *hdr = &msg->hdr;    MYASSERT( tcp_stream == msg, ("!") );    return hdr->npieces-1;}/*---------------------------------------------------------------------------*/int TCP_piecelen( TCP_stream *tcp_stream, int i ){    TCPRecvMsg *msg = &tcp->recv_msg;    TCPMsgHeaderPiece *hdr = &msg->hdr;    MYASSERT( tcp_stream == msg, ("!") );    MYASSERT( 0<=i && i < hdr->npieces-1, ("Only #%d pieces", hdr->npieces-1) );    return hdr->piecelen[i+1];}/*---------------------------------------------------------------------------*/int TCP_debug_level( int level ){    int old = tcpfmdbg;    tcpfmdbg = level;    return old;}/*---------------------------------------------------------------------------*/

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -