📄 p4_sock_list.c
字号:
#include "p4.h"#include "p4_sys.h"#ifndef THREAD_LISTENER/* * This listener is based on a rewrite provided by Pete Wykcoff <pw@osc.edu>. * His rewrite fixes a number of problems with the multiple slave (comm=shared) * version, which could cause the listener to become confused about which * slave it was communicating with. The fix involves using a separate pipe * for each slave and keeping track of the state of each individual socket. * Thanks, Pete! */typedef struct { enum { OK, BUSY, DEAD } state; int busycount; /* number of outstanding messages */} sock_state_t;static sock_state_t *sock_state = 0;static void wakeup_slave(int idx);static void poke_slave(int idx );static int process_slave_message(int idx);static int process_connect_request(int listening_fd);P4VOID listener( void ){ P4BOOL done = P4_FALSE; int i; p4_dprintfl(70, "enter listener, pid %d\n", getpid()); dump_listener(70); sock_state = p4_malloc(listener_info->num * sizeof(*sock_state)); for (i=0; i<listener_info->num; i++) { sock_state[i].state = OK; sock_state[i].busycount = 0; } while (!done) { int max_fd, nfds, numbusy; fd_set read_fds; struct timeval tv, *tvp; FD_ZERO(&read_fds); FD_SET(listener_info->listening_fd, &read_fds); max_fd = listener_info->listening_fd; numbusy = 0; for (i=0; i<listener_info->num; i++) { if (sock_state[i].state != DEAD) { FD_SET(listener_info->slave_fd[i], &read_fds); if (listener_info->slave_fd[i] > max_fd) max_fd = listener_info->slave_fd[i]; if (sock_state[i].state == BUSY) ++numbusy; } } if (numbusy) { tvp = &tv; tv.tv_sec = 0; tv.tv_usec = 100000; } else tvp = 0; /* SYSCALL_P4 retries on EINTR; other errors are fatal */ SYSCALL_P4(nfds, select(max_fd + 1, &read_fds, 0, 0, tvp)); if (nfds < 0) { p4_error("listener select", nfds); } if (nfds == 0) { if (tvp) { for (i=0; i<listener_info->num; i++) if (sock_state[i].state == BUSY) { p4_dprintfl(70, "wakeup slave %d from timeout\n", i); /* There is a race condition here. If the slave wakes up after having read the previous message, then there won't be a message to read. To handle this, instead of simply signaling the slave, we send a WAKEUP_SLAVE message. */ /* A downside to this approach is that messages could pile up in some cases, but this is less likely and harmful than sending a signal without a message. */ poke_slave(i); } } else p4_dprintfl(70, "select timeout\n"); continue; } /* We use |= to insure that after the loop, we haven't lost any "done" messages. There really are some nasty race conditions here, and all this does is cause us to NOT lose a "DIE" message */ if (FD_ISSET(listener_info->listening_fd, &read_fds)) { p4_dprintfl(70, "input on listening_fd %d\n", listener_info->listening_fd); done |= process_connect_request(listener_info->listening_fd); --nfds; } for (i=0; nfds && i<listener_info->num; i++) { if (FD_ISSET(listener_info->slave_fd[i], &read_fds)) { p4_dprintfl(70, "input on pipe %d, slave_fd = %d, pid = %d\n", i, listener_info->slave_fd[i], listener_info->slave_pid[i]); done |= process_slave_message(i); --nfds; } } } /* close( l->listening_fd ); close( l->slave_fd ); */ p4_dprintfl(70, "exit listener\n"); exit(0);}/* * Return index in array based on incoming pid. */static intlookup_slave_by_pid(int pid){ int i; for (i=0; i<listener_info->num; i++) { if (listener_info->slave_pid[i] == pid) return i; } p4_error("lookup_slave_index_by_pid: pid not found = ", pid); return -1;}/* * Forward a message received from the net to the pipe for * the destination slave. */static voidmessage_to_slave( int idx, struct slave_listener_msg *msg){/* fprintf (stderr, "send to %d\n", listener_info->slave_fd[idx] ); */ net_send(listener_info->slave_fd[idx], msg, sizeof(*msg), P4_FALSE); sock_state[idx].state = BUSY; ++sock_state[idx].busycount;/* if (sock_state[idx].busycount > 1) { fprintf( stderr, "busy count to %d = %d on fd %d\n", idx, sock_state[idx].busycount, listener_info->slave_fd[idx] ) ; } */ wakeup_slave(idx); /* fprintf( stderr, "signal sent\n" ); fflush( stderr );*/}/* * Send a wakeup message to a slave */static voidpoke_slave( int idx ){ struct slave_listener_msg msg; msg.type = p4_i_to_n(WAKEUP_SLAVE); net_send(listener_info->slave_fd[idx], &msg, sizeof(msg), P4_FALSE); wakeup_slave(idx);}/* * Send a signal to the process telling him to pay attention to the * pipe from the listener. * Question: is there a possible race condition here when the * processes are exiting? We've seen some failures the appear to happen * near or during MPI_Finalize. */static voidwakeup_slave( int idx ){ if (kill(listener_info->slave_pid[idx], LISTENER_ATTN_SIGNAL) == -1) { /* might have died on his own, okay */ p4_dprintf("wakeup_slave: unable to interrupt slave %d pid %d\n", idx, listener_info->slave_pid[idx]); sock_state[idx].state = DEAD; }}/* * Accept a new socket from the network, deal with it, possibly * forwarding the message on to a slave. The new connection is * always closed immediately after receipt of this message. */static P4BOOL process_connect_request(int listening_fd){ struct slave_listener_msg msg; int msglen; int connection_fd; int from, to_pid, idx, type, lport; P4BOOL rc = P4_FALSE; p4_dprintfl(70, "process_connect_request on %d\n", listening_fd); connection_fd = net_accept(listening_fd); p4_dprintfl(70, "accepted on connection_fd=%d reading size=%d\n", connection_fd,sizeof(msg)); /* We originally used net_recv here, but there is the chance that a bogus message arrives. In that case, we read, discard, and close the connection. We detect a bogus message by either a timeout or invalid message type (we should switch to a session-specific message cookie). Because we need a timeout, we can't use net_recv. Since we don't need a very complex receive message, this isn't such a bad thing */ msglen = net_recv_timeout(connection_fd, &msg, sizeof(msg), 10); if (msglen == PRECV_EOF || msglen != sizeof(msg)) { p4_dprintf("process_connect_request: bad connect request len %d" " wanted %d\n", msglen, sizeof(msg)); close(connection_fd); return (P4_FALSE); } close( connection_fd ); type = p4_n_to_i(msg.type); switch (type) { case IGNORE_THIS: p4_dprintfl(70, "got IGNORE_THIS from net\n"); break; case DIE: from = p4_n_to_i(msg.from); p4_dprintfl(99, "received DIE msg from remote %d\n", from); rc = P4_TRUE; break; case KILL_SLAVE: /* * KILL_SLAVE is used by a remote machine to destroy a particular * process here, but not the listener (see DIE). * A KILL_SLAVE message is very strong and causes nearly immediate * exit by the slave. */ from = p4_n_to_i(msg.from); to_pid = p4_n_to_i(msg.to_pid); idx = lookup_slave_by_pid( to_pid ); p4_dprintfl(10, "received msg for %d: kill_slave from %d to_pid %d\n", idx, from, to_pid); message_to_slave( idx, &msg );#ifdef FOO slave_fd = listener_info->slave_fd; if (kill(to_pid, LISTENER_ATTN_SIGNAL) == -1) { p4_dprintf("Listener: Unable to interrupt client pid=%d.\n", to_pid); break; } net_send(slave_fd, &msg, sizeof(msg), P4_FALSE); /* wait for msg from slave indicating it got connected */ /* * do not accept any more connections for slave until it has fully * completed this one, i.e. do not want to interrupt it until it has * handled this interrupt */ p4_dprintfl(70, "waiting for slave to handle interrupt\n"); net_recv(slave_fd, &msg, sizeof(msg)); /* Check that we get a valid message; for now (see p4_sock_conn/ handle_connection_interrupt) this is just IGNORE_THIS */ if (p4_i_to_n(msg.type) != IGNORE_THIS) { p4_dprintf("received incorrect handshake message type=%d\n", p4_i_to_n(msg.type) ); p4_error("slave_listener_msg: broken handshake", p4_i_to_n(msg.type)); } p4_dprintfl(70, "back from slave handling interrupt\n");#endif break; case CONNECTION_REQUEST: from = p4_n_to_i(msg.from); to_pid = p4_n_to_i(msg.to_pid); idx = lookup_slave_by_pid(to_pid);/* to = p4_n_to_i(msg.to); */ lport = p4_n_to_i(msg.lport); p4_dprintfl(70, "process_connect_request: to slave %d pid %d from %d port %d\n", idx, to_pid, from, lport ); message_to_slave( idx, &msg );#ifdef FOO slave_fd = listener_info->slave_fd; if (kill(to_pid, LISTENER_ATTN_SIGNAL) == -1) { p4_dprintf("Listener: Unable to interrupt client pid=%d.\n", to_pid); break; } net_send(slave_fd, &msg, sizeof(msg), P4_FALSE); /* wait for msg from slave indicating it got connected */ /* * do not accept any more connections for slave until it has fully * completed this one, i.e. do not want to interrupt it until it has * handled this interrupt */ p4_dprintfl(70, "waiting for slave to handle interrupt\n"); net_recv(slave_fd, &msg, sizeof(msg)); /* Check that we get a valid message; for now (see p4_sock_conn/ handle_connection_interrupt) this is just IGNORE_THIS */ if (p4_i_to_n(msg.type) != IGNORE_THIS) { p4_dprintf("received incorrect handshake message type=%d\n", p4_i_to_n(msg.type) ); p4_error("slave_listener_msg: broken handshake", p4_i_to_n(msg.type)); } p4_dprintfl(70, "back from slave handling interrupt\n");#endif break; default: p4_dprintf("invalid type %d in process_connect_request\n", type); break; } close(connection_fd); return (rc);}static P4BOOL process_slave_message(int idx){ struct slave_listener_msg msg; P4BOOL rc = P4_FALSE; int type, from, cc; /* * An EOF will happen naturally if the slave process exits. Do * not force an error. In fact, don't even use net_recv() since * this is a local pipe. Just read it. */ cc = read(listener_info->slave_fd[idx], &msg, sizeof(msg)); if (cc == 0 || (cc < 0 && errno == ECONNRESET)) { /* ECONNRESET means there was still data on the connection, but * it can be ignored since the slave already exited. */ sock_state[idx].state = DEAD; close(listener_info->slave_fd[idx]); return rc; } if (cc < 0) { p4_dprintf("process_slave_message: idx %d fd %d pid %d cc %d" " errno %d\n", idx, listener_info->slave_fd[idx], listener_info->slave_pid[idx], cc, errno); p4_error("process_slave_message: read pipe", cc); } if (cc != sizeof(msg)) p4_error("process_slave_message: short read from pipe", 0); type = p4_n_to_i(msg.type); from = p4_n_to_i(msg.from); switch (type) { case IGNORE_THIS: /* response to forwarded message, clear his busy flag */ if (sock_state[idx].state == BUSY) { p4_dprintfl(20, "process_slave_message: slave %d busy was %d\n", idx, sock_state[idx].busycount); --sock_state[idx].busycount; if (sock_state[idx].busycount == 0) sock_state[idx].state = OK; } else { p4_dprintf("process_slave_message: ignoring IGNORE_THIS for %d", idx); } break; case DIE: /* see DIE from remote above, just quit the listener */ p4_dprintfl(70, "received die msg from slave %d\n", from); rc = P4_TRUE; break; default: p4_dprintf("received unknown message type=%d from=%d\n", type, from); p4_error("slave_listener_msg: unknown message type", type); break; } return (rc);}#else /* def THREAD_LISTENER *//* * The thread listener logic is quite different from the process listener * logic. This takes advantage of the fact that the thread is in the same * process. The algorithm is this: * Let L be the listener thread and P be the "process"/user thread * To connect, P sends a message to its OWN listener, using the pipe * between them (this allows L to use a select to wait for work to do). * P then waits for a message back down the pipe that indicates that the * connection is ready. It may get messages about other connections * becoming ready while it is waiting. * * L selects on the pipe to P and the external connection socket. * If it gets a request from P, it checks the connection table; if * the connection has already been made, it ignores the request (since * the request-ready message is already in the pipe). Otherwise, it * creates a new socket and contacts the remote listener. * * If the rank of L is LOWER than the rank of the remote L, this is the * socket that will be used for the connection. Once the remote listener * accepts the connection, BOTH listeners (local and remote) transfer the * socket fd into the connection tables, set the connection to EST, and * send a message down the pipe to P. * * If the rank of L is higher than the rank of the remote L, a message is * sent asking the remote (lower rank) L to establish a connection. * The socket used for this request is closed when the connection is
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -