📄 p4_sock_list.c
字号:
#include "p4.h"#include "p4_sys.h"#ifndef THREAD_LISTENERstatic P4BOOL process_slave_message(int fd);static P4BOOL process_connect_request(int fd);P4VOID listener(){ struct listener_data *l = listener_info; P4BOOL done = P4_FALSE; fd_set read_fds; int i, nfds, fd; p4_dprintfl(70, "enter listener \n"); dump_listener(70); while (!done) { FD_ZERO(&read_fds); FD_SET(l->listening_fd, &read_fds); FD_SET(l->slave_fd[0], &read_fds); SYSCALL_P4(nfds, select(p4_global->max_connections, &read_fds, 0, 0, 0)); if (nfds < 0) p4_error("listener select", nfds); if (nfds == 0) p4_dprintfl(70, "select timeout\n"); fd = 0; for (i = 0; i < nfds && !done; i++) { while (fd < p4_global->max_connections) { if (FD_ISSET(fd, &read_fds)) { if (fd == l->listening_fd || fd == l->slave_fd[0]) break; } fd++; } p4_dprintfl(70, "got fd=%d listening_fd=%d slave_fd=%d\n", fd, l->listening_fd, l->slave_fd[0]); /* We use |= to insure that after the loop, we haven't lost any "done" messages. There really are some nasty race conditions here, and all this does is cause us to NOT lose a "DIE" message */ if (fd == l->listening_fd) done |= process_connect_request(fd); else if (fd == l->slave_fd[0]) done |= process_slave_message(fd); fd++; } } close( l->listening_fd ); p4_dprintfl(70, "exit listener\n"); exit(0);}static P4BOOL process_connect_request(int fd){ struct slave_listener_msg msg; int type, msglen; int connection_fd, slave_fd; int from, lport, to_pid, to; P4BOOL rc = P4_FALSE; p4_dprintfl(70, "processing connect check/request on %d\n", fd); connection_fd = net_accept(fd); p4_dprintfl(70, "accepted on connection_fd=%d reading size=%d\n", connection_fd,sizeof(msg)); /* We originally used net_recv here, but there is the chance that a bogus message arrives. In that case, we read, discard, and close the connection. We detect a bogus message by either a timeout or invalid message type (we should switch to a session-specific message cookie). Because we need a timeout, we can't use net_recv. Since we don't need a very complex receive message, this isn't such a bad thing */ if ((msglen = net_recv_timeout(connection_fd, &msg, sizeof(msg), 10)) == PRECV_EOF || msglen != sizeof(msg)) { close( connection_fd ); return (P4_FALSE); } type = p4_n_to_i(msg.type); switch (type) { case IGNORE_THIS: p4_dprintfl(70, "got IGNORE_THIS\n"); break; case CONNECTION_REQUEST: from = p4_n_to_i(msg.from); to_pid = p4_n_to_i(msg.to_pid); to = p4_n_to_i(msg.to); lport = p4_n_to_i(msg.lport); p4_dprintfl(70, "connection_request2: poking slave: from=%d lport=%d to_pid=%d to=%d\n", from, lport, to_pid, to); slave_fd = listener_info->slave_fd[0]; if (kill(to_pid, LISTENER_ATTN_SIGNAL) == -1) { p4_dprintf("Listener: Unable to interrupt client pid=%d.\n", to_pid); break; } net_send(slave_fd, &msg, sizeof(msg), P4_FALSE); /* wait for msg from slave indicating it got connected */ /* * do not accept any more connections for slave until it has fully * completed this one, i.e. do not want to interrupt it until it has * handled this interrupt */ p4_dprintfl(70, "waiting for slave to handle interrupt\n"); net_recv(slave_fd, &msg, sizeof(msg)); /* Check that we get a valid message; for now (see p4_sock_conn/ handle_connection_interrupt) this is just IGNORE_THIS */ if (p4_i_to_n(msg.type) != IGNORE_THIS) { p4_dprintf("received incorrect handshake message type=%d\n", p4_i_to_n(msg.type) ); p4_error("slave_listener_msg: broken handshake", p4_i_to_n(msg.type)); } p4_dprintfl(70, "back from slave handling interrupt\n"); break; default: p4_dprintf("invalid type %d in process_connect_request\n", type); break; } close(connection_fd); return (rc);}static P4BOOL process_slave_message(int fd){ struct slave_listener_msg msg; int type; int from; P4BOOL rc = P4_FALSE; int status; status = net_recv(fd, &msg, sizeof(msg)); if (status == PRECV_EOF) { p4_error("slave_listener_msg: got eof on fd=", fd); } type = p4_n_to_i(msg.type); from = p4_n_to_i(msg.from); switch (type) { case DIE: p4_dprintfl(70, "received die msg from %d\n", from); rc = P4_TRUE; break; default: p4_dprintf("received unknown message type=%d from=%d\n", type, from); p4_error("slave_listener_msg: unknown message type", type); break; } return (rc);}#else /* def THREAD_LISTENER *//* * The thread listener logic is quite different from the process listener * logic. This takes advantage of the fact that the thread is in the same * process. The algorithm is this: * Let L be the listener thread and P be the "process"/user thread * To connect, P sends a message to its OWN listener, using the pipe * between them (this allows L to use a select to wait for work to do). * P then waits for a message back down the pipe that indicates that the * connection is ready. It may get messages about other connections * becoming ready while it is waiting. * * L selects on the pipe to P and the external connection socket. * If it gets a request from P, it checks the connection table; if * the connection has already been made, it ignores the request (since * the request-ready message is already in the pipe). Otherwise, it * creates a new socket and contacts the remote listener. * * If the rank of L is LOWER than the rank of the remote L, this is the * socket that will be used for the connection. Once the remote listener * accepts the connection, BOTH listeners (local and remote) transfer the * socket fd into the connection tables, set the connection to EST, and * send a message down the pipe to P. * * If the rank of L is higher than the rank of the remote L, a message is * sent asking the remote (lower rank) L to establish a connection. * The socket used for this request is closed when the connection is * established. This is the only time a socket is created and later closed. * * Because this is so different from the process listener, there is a * separate establish_connection routine. * * Why choose the lower rank to establish the connection? Because the * first round of connections is from the master, at rank 0. Additional * connections as part of the initial distribution tree are also from low * to high rank. This reduces the number of connections that are made. */P4VOID thread_listener(){ struct slave_listener_msg msg; int type; int connection_fd; int from, lport, to_pid, to;/* int do_conn; P4BOOL rc = P4_FALSE; struct proc_info *from_pi; */ fd_set read_fds; int nfds, nfds_in; int msglen; p4_dprintfl(70,"TL: thread listener starting\n"); while(1) { p4_dprintfl(70, "TL: thread listener starting select on fd=%d port=%d\n", p4_global->listener_fd,p4_global->listener_port); FD_ZERO(&read_fds); FD_SET(p4_global->listener_fd, &read_fds); FD_SET(listener_info->slave_fd[0], &read_fds); nfds_in = p4_global->listener_fd; if (listener_info->slave_fd[0] > nfds_in) nfds_in = listener_info->slave_fd[0]; nfds_in++; SYSCALL_P4(nfds, select(nfds_in, &read_fds, 0, 0, 0)); if (nfds < 0) p4_error("listener select", nfds); if (nfds == 0) { p4_dprintfl(70, "TL: select timeout\n"); continue; } /* Process remote connection requests first */ if (FD_ISSET(p4_global->listener_fd, &read_fds)) { /* Accept connection, get message */ p4_dprintfl( 70, "TL: starting accept\n" ); connection_fd = net_accept(p4_global->listener_fd); p4_dprintfl(70, "TL: thread listener accepted on %d, got connection_fd=%d\n", p4_global->listener_fd, connection_fd); if ((msglen = net_recv_timeout(connection_fd, &msg, sizeof(msg),10)) == PRECV_EOF) { p4_dprintf("TL: thread listener detected EOF on fd=%d\n", connection_fd); p4_error("thread listener detected EOF",-1); } if (msglen != sizeof(msg)) { p4_dprintf("TL: message was wrong size (%d)\n", msglen ); close(connection_fd); } type = p4_n_to_i(msg.type); switch (type) { case IGNORE_THIS: p4_dprintfl(70, "TL: got IGNORE_THIS\n"); break; case CONNECTION_REQUEST: from = p4_n_to_i(msg.from); to_pid = p4_n_to_i(msg.to_pid); to = p4_n_to_i(msg.to); lport = p4_n_to_i(msg.lport); if (lport != -1) { /* Message from non-threaded listener! */ } p4_dprintfl(70, "TL: got connection_request: from=%d lport=%d to_pid=%d to=%d\n", from, lport, to_pid, to); if (p4_local->conntab[from].type == CONN_REMOTE_NON_EST) { /* Establish the connection */ /* p4_local->conntab[from].type = CONN_REMOTE_OPENING; */ p4_dprintfl(70, "TL: connection now opening for %d\n", from ); if (p4_local->my_id < from) { int new_connection_fd; p4_dprintfl(90,"TL: myid < from, myid = %d, from = %d\n", p4_local->my_id,from); /* Create a connection back to "from". We could use the same socket, but using the same request code is easier */ new_connection_fd = request_connection(from); if (new_connection_fd < 0) { p4_error( "Could not create new connection", new_connection_fd ); } close( connection_fd ); connection_fd = new_connection_fd; /* We now have the socket for the connection */ } /* This is the new socket. Just keep it. */ p4_local->conntab[from].port = connection_fd; p4_local->conntab[from].same_data_rep =
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -