⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 p4_sock_sr.c

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 C
📖 第 1 页 / 共 2 页
字号:
	    /* Now, what to do ? */	    }    }    return (P4_FALSE);}P4BOOL sock_msg_avail_on_fd(int fd){    int i, rc, nfds;    struct timeval tv;    fd_set read_fds;    char tempbuf[2];    rc = P4_FALSE;    tv.tv_sec = 0;    tv.tv_usec = 0;    FD_ZERO(&read_fds);    FD_SET(fd, &read_fds);    SYSCALL_P4(nfds, select(fd+1, &read_fds, 0, 0, &tv));    if (nfds == -1)    {	p4_dprintfl(20,"sock_msg_avail_on_fd selected on %d\n", fd);	p4_error("sock_msg_avail_on_fd select", nfds);    }    if (nfds)			/* true even for eof */    {	/* see if data is on the socket or merely an eof condition */	/* this should not loop long because the select succeeded */	while ((rc = recv(fd, tempbuf, 1, MSG_PEEK)) == -1)	    ;		if (rc == 0)	/* if eof */	{	    /* eof; a process has closed its socket; may have died */	    for (i = 0; i < p4_global->num_in_proctable; i++)		if (p4_local->conntab[i].port == fd)		{		    p4_local->conntab[i].type = CONN_REMOTE_DYING;		    /*		    p4_error("tried to read from dead process",-1);		    */		}	}	else	    rc = P4_TRUE;    }    return (rc);}#ifdef CAN_DO_XDRint xdr_recv(fd, rmsg)int fd;struct p4_msg *rmsg;{    xdrproc_t xdr_proc;    XDR *xdr_dec;    char *xdr_buff, *msg;    int n;    int msg_len = 0, nbytes_read = 0;    int xdr_elsize, els_per_buf, xdr_numels;/* Begin bugfix, compute msg_len correct, Rolf Rabenseifner,04SEP97*/    int elsize; /* End   bugfix, compute msg_len correct, Rolf Rabenseifner,04SEP97*/    int xdr_len1, len_bytes;/* See new test (should only need the USE_U_INT_FOR_XDR).  Also,    other int args to xdr_array should also be u_int */#if defined(SUN_SOLARIS) || defined(CRAY) || defined(SGI) || \    defined(USE_U_INT_FOR_XDR)    u_int xdr_len;#elif defined(USE_UNSIGNED_INT_FOR_XDR)    unsigned int xdr_len;#else    int xdr_len;#endif    msg = (char *) &(rmsg->msg);    xdr_dec = &(p4_local->xdr_dec);    xdr_buff = p4_local->xdr_buff;    switch (rmsg->data_type)    {      case P4INT:	xdr_proc = (xdrproc_t) xdr_int;	xdr_elsize = XDR_INT_LEN;/* Begin bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/        elsize = sizeof(int);/* End   bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/	break;      case P4LNG:	xdr_proc = (xdrproc_t) xdr_long;	xdr_elsize = XDR_LNG_LEN;/* Begin bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/        elsize = sizeof(long);/* End   bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/	break;      case P4FLT:	xdr_proc = (xdrproc_t) xdr_float;	xdr_elsize = XDR_FLT_LEN;/* Begin bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/        elsize = sizeof(float);/* End   bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/	break;      case P4DBL:	xdr_proc = (xdrproc_t) xdr_double;	xdr_elsize = XDR_DBL_LEN;/* Begin bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/        elsize = sizeof(double);/* End   bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/	break;      default:	p4_dprintf("xdr_recv: invalid data type %d\n", rmsg->data_type);	return (-1);    }    xdr_numels = rmsg->len;    els_per_buf = (XDR_BUFF_LEN - XDR_PAD) / xdr_elsize;    while (xdr_numels > 0)    {	if (xdr_numels > els_per_buf)	    xdr_len = els_per_buf;	else	    xdr_len = xdr_numels;	xdr_len1 = xdr_len;	/* remember xdr_len */	len_bytes = (xdr_len * xdr_elsize) + XDR_PAD;    	p4_dprintfl(90, "xdr_recv: reading %d bytes from %d\n", len_bytes, fd);	n = net_recv(fd, xdr_buff, len_bytes);	p4_dprintfl(90, "xdr_recv: read %d bytes \n", n);	if (n < 0)	{	    p4_error("xdr_recv net_recv", n);	}	if (!xdr_setpos(xdr_dec, 0))	{	    p4_dprintf("xdr_recv: xdr_setpos failed\n");	    return (-1);	}	if (!xdr_array(xdr_dec, &msg, &xdr_len, XDR_BUFF_LEN,		       xdr_elsize, xdr_proc))	{	    p4_dprintf("xdr_recv: xdr_array failed\n");	    return (-1);	}	nbytes_read += len_bytes;	xdr_numels -= xdr_len1;/* Begin bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/	msg     = msg     + xdr_len1*elsize;	msg_len = msg_len + xdr_len1*elsize;/* instead of  *      msg = msg + len_bytes - XDR_PAD; *      msg_len = msg_len + len_bytes - XDR_PAD; */ /* End   bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/    }    rmsg->len = msg_len;    return (msg_len);}#endifP4VOID wait_for_ack( int fd ){    struct p4_msg *ack;    p4_dprintfl(30, "waiting for ack \n");    ack = socket_recv_on_fd(fd);    while (!(ack->ack_req & P4_ACK_REPLY_MASK))    {	queue_p4_message(ack, p4_local->queued_messages);	ack = socket_recv_on_fd(fd);    }    ack->msg_id = (-1);    free_p4_msg(ack);    p4_dprintfl(30, "received ack from %d\n", ack->from);}P4VOID send_ack( int fd, int to ){    struct p4_net_msg_hdr ack;    p4_dprintfl(30, "sending ack to %d\n", to);    ack.from = p4_i_to_n(p4_get_my_id());    ack.data_type = p4_i_to_n(P4NOX);    ack.msg_len = p4_i_to_n(0);    ack.to = p4_i_to_n(to);    ack.ack_req = p4_i_to_n(P4_ACK_REPLY_MASK);    net_send(fd, &ack, sizeof(ack), P4_FALSE);    p4_dprintfl(30, "sent ack to %d\n", to);}P4VOID shutdown_p4_socks()/*  Shutdown all sockets we know about discarding info  in either direction.  */{    int i;    if (!p4_local)		/* Need info to be defined */	return;    if (!p4_local->conntab)	return;    if (p4_local->my_id == LISTENER_ID)	return;    for (i = 0; i < p4_num_total_ids(); i++)	if (p4_local->conntab[i].type == CONN_REMOTE_EST)	{#ifndef SHUT_RDWR/* Posix 1g defines SHUT_RDWR */#define SHUT_RDWR 2#endif	    (P4VOID) shutdown(p4_local->conntab[i].port, SHUT_RDWR);	    /* Do we really want to do a close here ? */	    (P4VOID) close(p4_local->conntab[i].port);	}}/* * Modified socket messages available.  This looks for the important case * of either read on ANY or write on one specified socket.   * Return value is the fd of an available socket, with priority given to  * the write fd.  (i.e., if can write, return that fd first).  Return * -2 if no socket is ready (only if q_block is false) *  * Since the sockets are bi-directional, return -1 for the write_fd ready. * * If q_block is true, block until some fd is ready.   */int p4_sockets_ready( int write_fd, int q_block ){    int i, fd, nfds;    int ndown = 0;    int max_fd;    struct timeval tv, *tv_p;    fd_set read_fds;    fd_set write_fds;    /* The while loop is necessary in case an EINTR causes the available       connections to change.  Note that this may need more changes       for use with the threaded listener */    do {	FD_ZERO(&read_fds);	FD_ZERO(&write_fds);	FD_SET(write_fd,&write_fds);	max_fd = write_fd;	if (p4_global && p4_local && p4_local->conntab) {	    /* This routine may, in some rare cases, be called 	       during the p4_initenv step before the p4_global and p4_local	       structures are fully initialized */	    for (i = 0; i < p4_global->num_in_proctable; i++)	    {		if (p4_local->conntab[i].type == CONN_REMOTE_EST)		{		    fd = p4_local->conntab[i].port;		    FD_SET(fd,&read_fds);		    if (fd > max_fd) max_fd = fd;		}		else if (p4_local->conntab[i].type == CONN_REMOTE_DYING) {		    /* We need to detect that some are down... */		    ndown++;		    /* Now, what to do ? */		}	    }	}	/* Now we have found the fds to wait on */	if (q_block) {	    /* Block forever */	    tv_p = 0;	}	else {	    /* don't block at all */	    tv.tv_sec = 0;	    tv.tv_usec = 0;	    tv_p = &tv;	}	nfds = select(max_fd + 1, &read_fds, &write_fds, 0, tv_p);    } while (nfds == -1 && 	     (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR));    if (nfds == -1)    {	p4_dprintfl(20,"p4_sockets_ready selected on %d\n", write_fd);	p4_error("p4_sockets_ready select", nfds);    }    /* First, check the write fd */    if (FD_ISSET(write_fd,&write_fds)) { return -1; }    /* Otherwise, find an fd and make sure that we can really read on it.       nfds is the number of available fds */    if (nfds == 0) return -2;    for (i = 0; i < p4_global->num_in_proctable; i++)    {	if (p4_local->conntab[i].type == CONN_REMOTE_EST)	{	    fd = p4_local->conntab[i].port;	    if (FD_ISSET(fd,&read_fds)) { 		char tempbuf[2];		int  rc;		/* see if data is on the socket or merely an eof condition */		/* this should not loop long because the select succeeded */		while ((rc = recv(fd, tempbuf, 1, MSG_PEEK)) == -1) {		    /* Should collect a count of the times this occurs */		    ;			}		if (rc == 0)	/* if eof */		{		    /* eof; a process has closed its socket; may have died */		    for (i = 0; i < p4_global->num_in_proctable; i++)			if (p4_local->conntab[i].port == fd)			{			    p4_local->conntab[i].type = CONN_REMOTE_DYING;			    /*			      p4_error("tried to read from dead process",-1);			    */			}		}		else		    return fd;	    }	}    }    return -2;}/* Look for a "close this connection for connection i.  This    reads only a header if there is any data; since we are closing the    connection, any other messages would be an error */void p4_look_for_close( int i ){    struct p4_net_msg_hdr nmsg;    int           fd, n;    fd = p4_local->conntab[i].port;    p4_dprintfl( 90, "Looking for close message for conn %d (fd %d)\n", i, fd );    if ( sock_msg_avail_on_fd( fd ) ) {	/* Read just a header */	n = net_recv(fd, &nmsg, sizeof(struct p4_net_msg_hdr));	if (p4_n_to_i(nmsg.ack_req) & P4_CLOSE_MASK) 	{	    p4_dprintfl(20,"Received looked-for close connection on %d (fd %d)\n",	i, fd );	    p4_local->conntab[i].type = CONN_REMOTE_CLOSED;	}	else {	    p4_dprintfl(90, "Unexpected message seen while closing socket\n" );	}    }}/* Wait until a message is available from any source.   This includes the listener.  Returns one if select found something. */int p4_wait_for_socket_msg( int is_blocking ){    int    i, fd, nfds, max_fd;    struct timeval tv;    fd_set read_fds;    int    nactive;    int    timeout_sec = 9;        if (!is_blocking) timeout_sec = 0;    while (1)     {	tv.tv_sec = timeout_sec;	tv.tv_usec = 0;  /* RMB */	FD_ZERO(&read_fds);	max_fd = -1;#ifdef THREAD_LISTENER	p4_dprintfl(70,"p4_wait_for_socket_msg: p4_local->listener_fd is %d\n",		    p4_local->listener_fd);	FD_SET(p4_local->listener_fd, &read_fds);	max_fd = p4_local->listener_fd;#endif	nactive = 0;	for (i = 0; i < p4_global->num_in_proctable; i++)	{	    if (p4_local->conntab[i].type == CONN_REMOTE_EST)	    {		fd = p4_local->conntab[i].port;		FD_SET(fd, &read_fds);		if (fd > max_fd) max_fd = fd;		nactive++;	    }	}	/* If there is only one process, there will NEVER be any active	   connections.  	   Question: does this cover the case of multiple processes but	   little communication between them, since the connections	   are established dynamically? 	 */#ifndef P4_WITH_MPD	if (!nactive && p4_global->num_in_proctable > 1)	{	    /* There are no active connections! 	       Let some other routine handle this. */	    return 1;	}#endif	/* Run select; if interrupted, get read_fds (in case a connection	   has occurred) and restart the connection */	nfds = select(max_fd + 1, &read_fds, 0, 0, &tv);	if (is_blocking) timeout_sec = 9;	if (nfds == -1 && errno == EINTR) continue;	if (nfds) {	    return 1;	}	if (!is_blocking) {	    /* Did not find anything and non-blocking. */	    return 0;	}    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -