⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 p4_sock_list.c

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 C
📖 第 1 页 / 共 2 页
字号:
#include "p4.h"#include "p4_sys.h"#ifndef THREAD_LISTENER/* * This listener is based on a rewrite provided by Pete Wykcoff <pw@osc.edu>. * His rewrite fixes a number of problems with the multiple slave (comm=shared) * version, which could cause the listener to become confused about which  * slave it was communicating with.  The fix involves using a separate pipe  * for each slave and keeping track of the state of each individual socket. * Thanks, Pete! */typedef struct {    enum { OK, BUSY, DEAD } state;    int busycount;  /* number of outstanding messages */} sock_state_t;static sock_state_t *sock_state = 0;static void wakeup_slave(int idx);static void poke_slave(int idx );static int process_slave_message(int idx);static int process_connect_request(int listening_fd);P4VOID listener( void ){    P4BOOL done = P4_FALSE;    int i;    p4_dprintfl(70, "enter listener, pid %d\n", getpid());    dump_listener(70);    sock_state = p4_malloc(listener_info->num * sizeof(*sock_state));    for (i=0; i<listener_info->num; i++) {	sock_state[i].state = OK;	sock_state[i].busycount = 0;    }    while (!done) {	int max_fd, nfds, numbusy;	fd_set read_fds;	struct timeval tv, *tvp;	FD_ZERO(&read_fds);	FD_SET(listener_info->listening_fd, &read_fds);	max_fd = listener_info->listening_fd;	numbusy = 0;	for (i=0; i<listener_info->num; i++) {	    if (sock_state[i].state != DEAD) {		FD_SET(listener_info->slave_fd[i], &read_fds);		if (listener_info->slave_fd[i] > max_fd)		    max_fd = listener_info->slave_fd[i];		if (sock_state[i].state == BUSY)		    ++numbusy;	    }	}	if (numbusy) {	    tvp = &tv;	    tv.tv_sec = 0;	    tv.tv_usec = 100000;	} else	    tvp = 0;  	/* SYSCALL_P4 retries on EINTR; other errors are fatal */	SYSCALL_P4(nfds, select(max_fd + 1, &read_fds, 0, 0, tvp));	if (nfds < 0) {	    p4_error("listener select", nfds);	}	if (nfds == 0) {	    if (tvp) {		for (i=0; i<listener_info->num; i++)		    if (sock_state[i].state == BUSY) {			p4_dprintfl(70, "wakeup slave %d from timeout\n", i);			/* There is a race condition here.  If the 			   slave wakes up after having read the 			   previous message, then there won't be a message			   to read.  To handle this, instead of			   simply signaling the slave, we send a 			   WAKEUP_SLAVE message. */			/* A downside to this approach is that messages could			   pile up in some cases, but this is less likely			   and harmful than sending a signal without a message.			*/			poke_slave(i);		    }	    } else		p4_dprintfl(70, "select timeout\n");	    continue;	}	/* We use |= to insure that after the loop, we haven't lost	   any "done" messages. 	   There really are some nasty race conditions here, and all	   this does is cause us to NOT lose a "DIE" message	*/	if (FD_ISSET(listener_info->listening_fd, &read_fds)) {	    p4_dprintfl(70, "input on listening_fd %d\n",	      listener_info->listening_fd);	    done |= process_connect_request(listener_info->listening_fd);	    --nfds;	}	for (i=0; nfds && i<listener_info->num; i++) {	    if (FD_ISSET(listener_info->slave_fd[i], &read_fds)) {		p4_dprintfl(70, "input on pipe %d, slave_fd = %d, pid = %d\n",		  i, listener_info->slave_fd[i], listener_info->slave_pid[i]);		done |= process_slave_message(i);		--nfds;	    }	}    }	/*    close( l->listening_fd );    close( l->slave_fd );	*/    p4_dprintfl(70, "exit listener\n");    exit(0);}/* * Return index in array based on incoming pid. */static intlookup_slave_by_pid(int pid){    int i;    for (i=0; i<listener_info->num; i++) {	if (listener_info->slave_pid[i] == pid)	    return i;    }    p4_error("lookup_slave_index_by_pid: pid not found = ", pid);    return -1;}/* * Forward a message received from the net to the pipe for * the destination slave. */static voidmessage_to_slave( int idx, struct slave_listener_msg *msg){/*    fprintf (stderr, "send to %d\n", listener_info->slave_fd[idx] ); */    net_send(listener_info->slave_fd[idx], msg, sizeof(*msg), P4_FALSE);    sock_state[idx].state = BUSY;    ++sock_state[idx].busycount;/*    if (sock_state[idx].busycount > 1) { 	fprintf( stderr, "busy count to %d = %d on fd %d\n", 	     idx, sock_state[idx].busycount, listener_info->slave_fd[idx] ) ;    } */    wakeup_slave(idx);    /*  fprintf( stderr, "signal sent\n" );	fflush( stderr );*/}/*  * Send a wakeup message to a slave */static voidpoke_slave( int idx ){    struct slave_listener_msg msg;    msg.type = p4_i_to_n(WAKEUP_SLAVE);    net_send(listener_info->slave_fd[idx], &msg, sizeof(msg), P4_FALSE);    wakeup_slave(idx);}/* * Send a signal to the process telling him to pay attention to the * pipe from the listener. * Question: is there a possible race condition here when the  * processes are exiting?  We've seen some failures the appear to happen * near or during MPI_Finalize. */static voidwakeup_slave( int idx ){    if (kill(listener_info->slave_pid[idx], LISTENER_ATTN_SIGNAL) == -1) {	/* might have died on his own, okay */	p4_dprintf("wakeup_slave: unable to interrupt slave %d pid %d\n",          idx, listener_info->slave_pid[idx]);	sock_state[idx].state = DEAD;    }}/* * Accept a new socket from the network, deal with it, possibly * forwarding the message on to a slave.  The new connection is * always closed immediately after receipt of this message. */static P4BOOL process_connect_request(int listening_fd){    struct slave_listener_msg msg;    int msglen;    int connection_fd;    int from, to_pid, idx, type, lport;    P4BOOL rc = P4_FALSE;    p4_dprintfl(70, "process_connect_request on %d\n", listening_fd);    connection_fd = net_accept(listening_fd);    p4_dprintfl(70, "accepted on connection_fd=%d reading size=%d\n", connection_fd,sizeof(msg));    /* We originally used net_recv here, but there is the chance that a       bogus message arrives.  In that case, we read, discard, and close       the connection.  We detect a bogus message by either a timeout       or invalid message type (we should switch to a session-specific       message cookie).  Because we need a timeout, we can't use net_recv.       Since we don't need a very complex receive message, this isn't       such a bad thing */    msglen = net_recv_timeout(connection_fd, &msg, sizeof(msg), 10);    if (msglen == PRECV_EOF || msglen != sizeof(msg)) {	p4_dprintf("process_connect_request: bad connect request len %d"		   " wanted %d\n", msglen, sizeof(msg));	close(connection_fd);	return (P4_FALSE);    }    close( connection_fd );    type = p4_n_to_i(msg.type);    switch (type)    {      case IGNORE_THIS:	p4_dprintfl(70, "got IGNORE_THIS from net\n");	break;      case DIE:	from = p4_n_to_i(msg.from);	p4_dprintfl(99, "received DIE msg from remote %d\n", from);	rc = P4_TRUE;	break;      case KILL_SLAVE:        /*	 * KILL_SLAVE is used by a remote machine to destroy a particular	 * process here, but not the listener (see DIE).	 * A KILL_SLAVE message is very strong and causes nearly immediate 	 * exit by the slave.  */	from = p4_n_to_i(msg.from);	to_pid = p4_n_to_i(msg.to_pid);	idx = lookup_slave_by_pid( to_pid );	p4_dprintfl(10, "received msg for %d: kill_slave from %d to_pid %d\n",	  idx, from, to_pid);	message_to_slave( idx, &msg );#ifdef FOO	slave_fd = listener_info->slave_fd;	if (kill(to_pid, LISTENER_ATTN_SIGNAL) == -1)	{	    p4_dprintf("Listener: Unable to interrupt client pid=%d.\n", to_pid);	    break;	}	net_send(slave_fd, &msg, sizeof(msg), P4_FALSE);	/* wait for msg from slave indicating it got connected */	/*	 * do not accept any more connections for slave until it has fully	 * completed this one, i.e. do not want to interrupt it until it has	 * handled this interrupt	 */	p4_dprintfl(70, "waiting for slave to handle interrupt\n");	net_recv(slave_fd, &msg, sizeof(msg));	/* Check that we get a valid message; for now (see p4_sock_conn/	   handle_connection_interrupt) this is just IGNORE_THIS */	if (p4_i_to_n(msg.type) != IGNORE_THIS) {	    p4_dprintf("received incorrect handshake message type=%d\n", 		       p4_i_to_n(msg.type) );	    p4_error("slave_listener_msg: broken handshake", 		     p4_i_to_n(msg.type));	    }	p4_dprintfl(70, "back from slave handling interrupt\n");#endif	break;      case CONNECTION_REQUEST:	from = p4_n_to_i(msg.from);	to_pid = p4_n_to_i(msg.to_pid);	idx = lookup_slave_by_pid(to_pid);/*	to = p4_n_to_i(msg.to); */	lport = p4_n_to_i(msg.lport); 	p4_dprintfl(70, "process_connect_request: to slave %d pid %d from %d port %d\n", idx, to_pid, from, lport );	message_to_slave( idx, &msg );#ifdef FOO	slave_fd = listener_info->slave_fd;	if (kill(to_pid, LISTENER_ATTN_SIGNAL) == -1)	{	    p4_dprintf("Listener: Unable to interrupt client pid=%d.\n", to_pid);	    break;	}	net_send(slave_fd, &msg, sizeof(msg), P4_FALSE);	/* wait for msg from slave indicating it got connected */	/*	 * do not accept any more connections for slave until it has fully	 * completed this one, i.e. do not want to interrupt it until it has	 * handled this interrupt	 */	p4_dprintfl(70, "waiting for slave to handle interrupt\n");	net_recv(slave_fd, &msg, sizeof(msg));	/* Check that we get a valid message; for now (see p4_sock_conn/	   handle_connection_interrupt) this is just IGNORE_THIS */	if (p4_i_to_n(msg.type) != IGNORE_THIS) {	    p4_dprintf("received incorrect handshake message type=%d\n", 		       p4_i_to_n(msg.type) );	    p4_error("slave_listener_msg: broken handshake", 		     p4_i_to_n(msg.type));	    }	p4_dprintfl(70, "back from slave handling interrupt\n");#endif	break;      default:	p4_dprintf("invalid type %d in process_connect_request\n", type);	break;    }    close(connection_fd);    return (rc);}static P4BOOL process_slave_message(int idx){    struct slave_listener_msg msg;    P4BOOL rc = P4_FALSE;    int type, from, cc;    /*     * An EOF will happen naturally if the slave process exits.  Do     * not force an error.  In fact, don't even use net_recv() since     * this is a local pipe.  Just read it.     */    cc = read(listener_info->slave_fd[idx], &msg, sizeof(msg));    if (cc == 0 || (cc < 0 && errno == ECONNRESET)) {	/* ECONNRESET means there was still data on the connection, but	 * it can be ignored since the slave already exited.	 */	sock_state[idx].state = DEAD;	close(listener_info->slave_fd[idx]);	return rc;    }    if (cc < 0) {	p4_dprintf("process_slave_message: idx %d fd %d pid %d cc %d"	  " errno %d\n",	  idx, listener_info->slave_fd[idx], listener_info->slave_pid[idx],	  cc, errno);	p4_error("process_slave_message: read pipe", cc);    }    if (cc != sizeof(msg))	p4_error("process_slave_message: short read from pipe", 0);    type = p4_n_to_i(msg.type);    from = p4_n_to_i(msg.from);    switch (type)    {      case IGNORE_THIS:	/* response to forwarded message, clear his busy flag */	if (sock_state[idx].state == BUSY) {	    p4_dprintfl(20, "process_slave_message: slave %d busy was %d\n",	      idx, sock_state[idx].busycount);	    --sock_state[idx].busycount;	    if (sock_state[idx].busycount == 0)		sock_state[idx].state = OK;	} else {	    p4_dprintf("process_slave_message: ignoring IGNORE_THIS for %d",	      idx);	}	break;      case DIE:	/* see DIE from remote above, just quit the listener */	p4_dprintfl(70, "received die msg from slave %d\n", from);	rc = P4_TRUE;	break;      default:	p4_dprintf("received unknown message type=%d from=%d\n", type, from);	p4_error("slave_listener_msg: unknown message type", type);	break;    }    return (rc);}#else /* def THREAD_LISTENER *//*  * The thread listener logic is quite different from the process listener * logic.  This takes advantage of the fact that the thread is in the same * process.  The algorithm is this: *   Let L be the listener thread and P be the "process"/user thread *   To connect, P sends a message to its OWN listener, using the pipe *   between them (this allows L to use a select to wait for work to do). *   P then waits for a message back down the pipe that indicates that the *   connection is ready.  It may get messages about other connections *   becoming ready while it is waiting. * *   L selects on the pipe to P and the external connection socket. *   If it gets a request from P, it checks the connection table; if *   the connection has already been made, it ignores the request (since *   the request-ready message is already in the pipe).  Otherwise, it *   creates a new socket and contacts the remote listener. *    *   If the rank of L is LOWER than the rank of the remote L, this is the *   socket that will be used for the connection.  Once the remote listener *   accepts the connection, BOTH listeners (local and remote) transfer the *   socket fd into the connection tables, set the connection to EST, and *   send a message down the pipe to P. * *   If the rank of L is higher than the rank of the remote L, a message is *   sent asking the remote (lower rank) L to establish a connection. *   The socket used for this request is closed when the connection is 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -