📄 p4_sock_conn.c
字号:
{ struct proc_info *my_pi, *dest_pi; char *my_host, *dest_host; int my_id; struct slave_listener_msg msg; int connection_fd; int dest_listener_con_fd; int my_listener, dest_listener; int new_listener_port, new_listener_fd; int num_tries; P4_BLOCK_SIG_DECL; P4_BLOCK_SIG(LISTENER_ATTN_SIGNAL); /* Get some initial information */ my_id = p4_get_my_id(); my_pi = get_proc_info(my_id); my_host = my_pi->host_name; my_listener = my_pi->port; dest_pi = get_proc_info(dest_id); dest_host = dest_pi->host_name; dest_listener = dest_pi->port; p4_dprintfl(70, "request_connection: my_id=%d my_host=%s my_listener=%d dest_id=%d dest_host=%s dest_listener=%d\n", my_id, my_host, my_listener, dest_id, dest_host, dest_listener); /* Have we already connected?? */ if (p4_local->conntab[dest_id].type == CONN_REMOTE_EST) { p4_dprintfl(70,"request_connection %d: already connected\n", dest_id); P4_RELEASE_SIG(LISTENER_ATTN_SIGNAL); return; } p4_dprintfl(70, "enter loop to connect to dest listener %s\n",dest_host); /* Connect to dest listener */ num_tries = 1; p4_has_timedout( 0 ); while((dest_listener_con_fd = net_conn_to_listener(dest_host,dest_listener,1)) == -1) { num_tries++; if (p4_has_timedout( 1 )) { p4_error( "Timeout in establishing connection to remote process", 0 ); } } p4_dprintfl(70, "conn_to_proc_contd: connected after %d tries, dest_listener_con_fd=%d\n",num_tries, dest_listener_con_fd); /* Setup a listener to accept the connection to dest_id */ net_setup_anon_listener(1, &new_listener_port, &new_listener_fd); /* Construct a connection request message */ msg.type = p4_i_to_n(CONNECTION_REQUEST); msg.from = p4_i_to_n(my_id); msg.lport = p4_i_to_n(new_listener_port); msg.to = p4_i_to_n(dest_id); msg.to_pid = p4_i_to_n(dest_pi->unix_id); /* Send it to dest_id's listener */ p4_dprintfl(70, "request_connection: sending CONNECTION_REQUEST to %d on fd=%d size=%d\n", dest_id,dest_listener_con_fd,sizeof(msg)); net_send(dest_listener_con_fd, &msg, sizeof(msg), P4_FALSE); p4_dprintfl(70, "request_connection: sent CONNECTION_REQUEST for %d (pid %d) to dest_listener on fd %d\n", dest_id, dest_pi->unix_id, dest_listener_con_fd); if (my_id < dest_id) { /* Wait for the remote process to connect to me */ p4_dprintfl(70, "request_connection: waiting for accept from %d on fd=%d, port=%d\n", dest_id, new_listener_fd, new_listener_port); /* This needs a timeout? ??? */ connection_fd = net_accept(new_listener_fd); p4_dprintfl(70, "request_connection: accepted from %d on %d\n", dest_id, connection_fd); /* Add the connection to the table */ p4_local->conntab[dest_id].port = connection_fd; p4_local->conntab[dest_id].same_data_rep = same_data_representation(p4_local->my_id,dest_id); /* Requires write ordering in the thread */ p4_local->conntab[dest_id].type = CONN_REMOTE_EST; } close(dest_listener_con_fd); /* Now release the listener connections */ close(new_listener_fd); P4_RELEASE_SIG(LISTENER_ATTN_SIGNAL); p4_dprintfl(70, "request_connection: finished connecting\n"); return;}/* sig isn't used except to match the function prototypes for POSIX signal handlers */P4VOID handle_connection_interrupt( int sig ){ struct slave_listener_msg msg; int type; int listener_fd; int to, to_pid, from, lport; int connection_fd; struct proc_info *from_pi; int myid = p4_get_my_id(); int num_tries; static int in_handler = 0; /* There is a small chance that we'll be in the handler when another signal is delivered. Since the listener will send a signal every .1 seconds until there is a response, we can simply return. This test does have a race condition, but it is very small, and we're going to ignore it. In fact, if signal blocking works correctly, we should *never* enter the handler while we are already in it. Perhaps this test should abort if in_handler? */ if (in_handler) return; in_handler = 1; listener_fd = p4_local->listener_fd; p4_dprintfl(70, "Inside handle_connection_interrupt, listener_fd=%d\n", listener_fd);#ifdef USE_NONBLOCKING_LISTENER_SOCKETS /* This parameter gives the number of attempts to read before deciding that something has gone wrong */# define MAX_DRY_ITERATIONS 1000000 /* * Must read non-blocking due to race conditions with using * signals as IPC mechanism. See the fcntl near get_pipe where * these are created. * * However, this should not loop endlessly. If a signal has been * delivered, the listener is trying to talk to us. To catch * failures in the listener or other logic (for example, not all of the * listener_fd's were properly set in the 1.2.3 release of MPICH). */ { int it_count = 0; for (;;) { int cc = read(listener_fd, &msg, sizeof(msg)); if (cc == 0) p4_error("handle_connection_interrupt: EOF from listener", 0); if (cc < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { it_count ++; if (it_count > MAX_DRY_ITERATIONS) { /* Temporary */ /* printf( "zillion iterations, resetting\n"); it_count = 0; p4_error("handled_connection_interrupt: listener is not sending", -1 ); */ /* give up; the listener will try again */ in_handler = 0; return; } continue; } p4_error("handle_connection_interrupt: read listener", cc); } /* these should be atomic: AF_UNIX, AF_STREAM */ if (cc != sizeof(msg)) p4_error("handle_connection_interrupt: short read from listener", 0); break; } }#else if (net_recv(listener_fd, &msg, sizeof(msg)) == PRECV_EOF) { p4_dprintf("OOPS: got eof in handle_connection_interrupt\n"); in_handler = 0; return; }#endif type = p4_n_to_i(msg.type); if (type == WAKEUP_SLAVE) { /* Ignore and return. This may be a poke for a message that we've already processed. In case these wakeups have piled up, we try to read again, and then only if there is nothing, do we return */#ifdef USE_NONBLOCKING_LISTENER_SOCKETS int cc; while ((cc = read(listener_fd, &msg, sizeof(msg))) > 0) { if (cc != sizeof(msg)) p4_error("handle_connection_interrupt: short read from listener", 0); type = p4_n_to_i(msg.type); if (type != WAKEUP_SLAVE) break; } if (cc <= 0) { in_handler = 0; return; } /* Otherwise, drop through with the new message */#else in_handler = 0; return;#endif } if (type == KILL_SLAVE) { msg.type = p4_i_to_n(IGNORE_THIS); p4_dprintfl(70, "handle_connection_interrupt: sending IGNORE_THIS to my_listener\n"); /* send msg to listener indicating I made the connection */ net_send(listener_fd, &msg, sizeof(msg), P4_FALSE); p4_dprintfl(99, "handle_connection_interrupt: exiting due to DIE msg\n"); /* Immediate exit on KILL_SLAVE */ exit(0); } if (type != CONNECTION_REQUEST) { p4_dprintf("handle_connection_interrupt: invalid type %d\n", type); in_handler = 0; return; } to = p4_n_to_i(msg.to); from = p4_n_to_i(msg.from); to_pid = p4_n_to_i(msg.to_pid); lport = p4_n_to_i(msg.lport); p4_dprintfl(70, "handle_connection_interrupt: msg contents: to=%d from=%d to_pid=%d lport=%d\n", to, from, to_pid, lport); /* If we're already connected, forget about the interrupt. */ if (p4_local->conntab[from].type != CONN_REMOTE_EST) { if (myid < from) { /* see if I have already started this connection */ p4_dprintfl(90,"myid < from, myid = %d, from = %d\n",myid,from); if (p4_global->dest_id[myid] != from) request_connection(from); } else { /* Get the information for the process we're connecting to */ from_pi = &(p4_global->proctable[from]); /* Connect to the waiting process */ p4_dprintfl(70, "connecting to port...\n"); num_tries = 1; /* connect to the requesting process, who is listening */ p4_dprintfl(70,"handling connection interrupt: connecting to %s\n",from_pi->host_name); p4_has_timedout( 0 ); while ((connection_fd = net_conn_to_listener(from_pi->host_name,lport,1)) == -1) { num_tries++; if (p4_has_timedout( 1 )) { p4_error( "Timeout in establishing connection to remote process", 0 ); } } p4_dprintfl(70, "handling connection interrupt: connected after %d tries, connection_fd=%d host = %s\n", num_tries, connection_fd, from_pi->host_name); /* We're connected, so we can add this connection to the table */ p4_local->conntab[from].port = connection_fd; p4_local->conntab[from].same_data_rep = same_data_representation(p4_local->my_id,from); /* Note that this requires write ordering in the threads */ p4_local->conntab[from].type = CONN_REMOTE_EST; p4_dprintfl(70, "marked as established fd=%d from=%d\n", connection_fd, from); } } else { p4_dprintfl(70,"ignoring interrupt from %d\n",from); } msg.type = p4_i_to_n(IGNORE_THIS); p4_dprintfl(70, "handle_connection_interrupt: sending IGNORE_THIS to my_listener\n"); /* send msg to listener indicating I made the connection */ net_send(listener_fd, &msg, sizeof(msg), P4_FALSE); p4_dprintfl(70, "handle_connection_interrupt: exiting handling intr from %d\n",from); /* If the return from this is SIG_DFL, then there is a problem ... */ /* The following re-establishes the signal handler, which is needed on systems where the handler is reset to SIG_DFL when it is triggered. Such systems are *broken*, since there is no good way to avoid the resulting race conditions. Unfortunately, we must work with those systems. We could reset the signal handler only on systems that require it. However, this should be safe for all cases. */ SIGNAL_P4(LISTENER_ATTN_SIGNAL, handle_connection_interrupt); in_handler = 0;}#endif#endif /* THREAD_LISTENER */#ifdef P4_WITH_MPD/* routines copied from MPD (mpdlib.c) and renamed as p4 routines. These are all that is necessary to communicate with the manager *//* FIXME: These should be part of the BNR library, not P4. */void p4_printf( int print_flag, char *fmt, ... ){ va_list ap; if (print_flag) { fprintf( stderr, "[%s]: ", whoami_p4 ); va_start( ap, fmt ); vfprintf( stderr, fmt, ap ); va_end( ap ); fflush( stderr ); }}int p4_read_line( int fd, char *buf, int maxlen ){ int n, rc; char c, *ptr; ptr = buf; for ( n = 1; n < maxlen; n++ ) { again: if ( ( rc = read( fd, &c, 1 ) ) == 1 ) { *ptr++ = c; if ( c == '\n' ) /* note \n is stored, like in fgets */ break; } else if ( rc == 0 ) { if ( n == 1 ) return( 0 ); /* EOF, no data read */ else break; /* EOF, some data read */ } else { if ( errno == EINTR ) goto again; return ( -1 ); /* error, errno set by read */ } } *ptr = 0; /* null terminate, like fgets */ return( n );}int p4_parse_keyvals( char *st ){ char *p, *keystart, *valstart; if ( !st ) return( -1 ); p4_keyval_tab_idx = 0; p = st; while ( 1 ) { while ( *p == ' ' ) p++; /* got non-blank */ if ( *p == '=' ) { p4_printf( 1, "p4_parse_keyvals: unexpected = at character %d in %s\n", p - st, st ); return( -1 ); } if ( *p == '\n' || *p == '\0' ) return( 0 ); /* normal exit */ /* got normal character */ keystart = p; /* remember where key started */ while ( *p != ' ' && *p != '=' && *p != '\n' && *p != '\0' ) p++; if ( *p == ' ' || *p == '\n' || *p == '\0' ) { p4_printf( 1, "p4_parse_keyvals: unexpected key delimiter at character %d in %s\n", p - st, st ); return( -1 ); } strncpy( p4_keyval_tab[p4_keyval_tab_idx].key, keystart, p - keystart ); p4_keyval_tab[p4_keyval_tab_idx].key[p - keystart] = '\0'; /* store key */ valstart = ++p; /* start of value */ while ( *p != ' ' && *p != '\n' && *p != '\0' ) p++; strncpy( p4_keyval_tab[p4_keyval_tab_idx].value, valstart, p - valstart ); p4_keyval_tab[p4_keyval_tab_idx].value[p - valstart] = '\0'; /* store value */ p4_keyval_tab_idx++; if ( *p == ' ' ) continue; if ( *p == '\n' || *p == '\0' ) return( 0 ); /* value has been set to empty */ }} void p4_dump_keyvals(){ int i; for (i=0; i < p4_keyval_tab_idx; i++) p4_printf(1, " %s=%s\n",p4_keyval_tab[i].key, p4_keyval_tab[i].value);}char *p4_getval( char *keystr, char *valstr ){ int i; for (i=0; i < p4_keyval_tab_idx; i++) { if ( strcmp( keystr, p4_keyval_tab[i].key ) == 0 ) { strcpy( valstr, p4_keyval_tab[i].value ); return valstr; } } valstr[0] = '\0'; return NULL;}void p4_chgval( char *keystr, char *valstr ){ int i; for ( i = 0; i < p4_keyval_tab_idx; i++ ) { if ( strcmp( keystr, p4_keyval_tab[i].key ) == 0 ) strcpy( p4_keyval_tab[i].value, valstr ); }}#define END ' '#define ESC_END '"'#define ESC '\\'#define ESC_ESC '\''void p4_stuff_arg( char arg[], char stuffed[]){ int i,j; for (i=0, j=0; i < strlen(arg); i++) { switch (arg[i]) { case END: stuffed[j++] = ESC; stuffed[j++] = ESC_END; break; case ESC: stuffed[j++] = ESC; stuffed[j++] = ESC_ESC; break; default: stuffed[j++] = arg[i]; } } stuffed[j] = '\0';}void p4_destuff_arg(char stuffed[], char arg[]){ int i,j; i = 0; j = 0; while (stuffed[i]) { /* END pulled off in parse */ switch (stuffed[i]) { case ESC: i++; switch (stuffed[i]) { case ESC_END: arg[j++] = END; i++; break; case ESC_ESC: arg[j++] = ESC; i++; break; } break; default: arg[j++] = stuffed[i++]; } } arg[j] = '\0';}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -