📄 p4_sock_sr.c
字号:
/* Now, what to do ? */ } } return (P4_FALSE);}P4BOOL sock_msg_avail_on_fd(int fd){ int i, rc, nfds; struct timeval tv; fd_set read_fds; char tempbuf[2]; rc = P4_FALSE; tv.tv_sec = 0; tv.tv_usec = 0; FD_ZERO(&read_fds); FD_SET(fd, &read_fds); SYSCALL_P4(nfds, select(fd+1, &read_fds, 0, 0, &tv)); if (nfds == -1) { p4_dprintfl(20,"sock_msg_avail_on_fd selected on %d\n", fd); p4_error("sock_msg_avail_on_fd select", nfds); } if (nfds) /* true even for eof */ { /* see if data is on the socket or merely an eof condition */ /* this should not loop long because the select succeeded */ while ((rc = recv(fd, tempbuf, 1, MSG_PEEK)) == -1) ; if (rc == 0) /* if eof */ { /* eof; a process has closed its socket; may have died */ for (i = 0; i < p4_global->num_in_proctable; i++) if (p4_local->conntab[i].port == fd) { p4_local->conntab[i].type = CONN_REMOTE_DYING; /* p4_error("tried to read from dead process",-1); */ } } else rc = P4_TRUE; } return (rc);}#ifdef CAN_DO_XDRint xdr_recv(fd, rmsg)int fd;struct p4_msg *rmsg;{ xdrproc_t xdr_proc; XDR *xdr_dec; char *xdr_buff, *msg; int n; int msg_len = 0, nbytes_read = 0; int xdr_elsize, els_per_buf, xdr_numels;/* Begin bugfix, compute msg_len correct, Rolf Rabenseifner,04SEP97*/ int elsize; /* End bugfix, compute msg_len correct, Rolf Rabenseifner,04SEP97*/ int xdr_len1, len_bytes;/* See new test (should only need the USE_U_INT_FOR_XDR). Also, other int args to xdr_array should also be u_int */#if defined(SUN_SOLARIS) || defined(CRAY) || defined(SGI) || \ defined(USE_U_INT_FOR_XDR) u_int xdr_len;#elif defined(USE_UNSIGNED_INT_FOR_XDR) unsigned int xdr_len;#else int xdr_len;#endif msg = (char *) &(rmsg->msg); xdr_dec = &(p4_local->xdr_dec); xdr_buff = p4_local->xdr_buff; switch (rmsg->data_type) { case P4INT: xdr_proc = (xdrproc_t) xdr_int; xdr_elsize = XDR_INT_LEN;/* Begin bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ elsize = sizeof(int);/* End bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ break; case P4LNG: xdr_proc = (xdrproc_t) xdr_long; xdr_elsize = XDR_LNG_LEN;/* Begin bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ elsize = sizeof(long);/* End bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ break; case P4FLT: xdr_proc = (xdrproc_t) xdr_float; xdr_elsize = XDR_FLT_LEN;/* Begin bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ elsize = sizeof(float);/* End bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ break; case P4DBL: xdr_proc = (xdrproc_t) xdr_double; xdr_elsize = XDR_DBL_LEN;/* Begin bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ elsize = sizeof(double);/* End bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ break; default: p4_dprintf("xdr_recv: invalid data type %d\n", rmsg->data_type); return (-1); } xdr_numels = rmsg->len; els_per_buf = (XDR_BUFF_LEN - XDR_PAD) / xdr_elsize; while (xdr_numels > 0) { if (xdr_numels > els_per_buf) xdr_len = els_per_buf; else xdr_len = xdr_numels; xdr_len1 = xdr_len; /* remember xdr_len */ len_bytes = (xdr_len * xdr_elsize) + XDR_PAD; p4_dprintfl(90, "xdr_recv: reading %d bytes from %d\n", len_bytes, fd); n = net_recv(fd, xdr_buff, len_bytes); p4_dprintfl(90, "xdr_recv: read %d bytes \n", n); if (n < 0) { p4_error("xdr_recv net_recv", n); } if (!xdr_setpos(xdr_dec, 0)) { p4_dprintf("xdr_recv: xdr_setpos failed\n"); return (-1); } if (!xdr_array(xdr_dec, &msg, &xdr_len, XDR_BUFF_LEN, xdr_elsize, xdr_proc)) { p4_dprintf("xdr_recv: xdr_array failed\n"); return (-1); } nbytes_read += len_bytes; xdr_numels -= xdr_len1;/* Begin bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ msg = msg + xdr_len1*elsize; msg_len = msg_len + xdr_len1*elsize;/* instead of * msg = msg + len_bytes - XDR_PAD; * msg_len = msg_len + len_bytes - XDR_PAD; */ /* End bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ } rmsg->len = msg_len; return (msg_len);}#endifP4VOID wait_for_ack( int fd ){ struct p4_msg *ack; p4_dprintfl(30, "waiting for ack \n"); ack = socket_recv_on_fd(fd); while (!(ack->ack_req & P4_ACK_REPLY_MASK)) { queue_p4_message(ack, p4_local->queued_messages); ack = socket_recv_on_fd(fd); } ack->msg_id = (-1); free_p4_msg(ack); p4_dprintfl(30, "received ack from %d\n", ack->from);}P4VOID send_ack( int fd, int to ){ struct p4_net_msg_hdr ack; p4_dprintfl(30, "sending ack to %d\n", to); ack.from = p4_i_to_n(p4_get_my_id()); ack.data_type = p4_i_to_n(P4NOX); ack.msg_len = p4_i_to_n(0); ack.to = p4_i_to_n(to); ack.ack_req = p4_i_to_n(P4_ACK_REPLY_MASK); net_send(fd, &ack, sizeof(ack), P4_FALSE); p4_dprintfl(30, "sent ack to %d\n", to);}P4VOID shutdown_p4_socks()/* Shutdown all sockets we know about discarding info in either direction. */{ int i; if (!p4_local) /* Need info to be defined */ return; if (!p4_local->conntab) return; if (p4_local->my_id == LISTENER_ID) return; for (i = 0; i < p4_num_total_ids(); i++) if (p4_local->conntab[i].type == CONN_REMOTE_EST) {#ifndef SHUT_RDWR/* Posix 1g defines SHUT_RDWR */#define SHUT_RDWR 2#endif (P4VOID) shutdown(p4_local->conntab[i].port, SHUT_RDWR); /* Do we really want to do a close here ? */ (P4VOID) close(p4_local->conntab[i].port); }}/* * Modified socket messages available. This looks for the important case * of either read on ANY or write on one specified socket. * Return value is the fd of an available socket, with priority given to * the write fd. (i.e., if can write, return that fd first). Return * -2 if no socket is ready (only if q_block is false) * * Since the sockets are bi-directional, return -1 for the write_fd ready. * * If q_block is true, block until some fd is ready. */int p4_sockets_ready( int write_fd, int q_block ){ int i, fd, nfds; int ndown = 0; int max_fd; struct timeval tv, *tv_p; fd_set read_fds; fd_set write_fds; /* The while loop is necessary in case an EINTR causes the available connections to change. Note that this may need more changes for use with the threaded listener */ do { FD_ZERO(&read_fds); FD_ZERO(&write_fds); FD_SET(write_fd,&write_fds); max_fd = write_fd; if (p4_global && p4_local && p4_local->conntab) { /* This routine may, in some rare cases, be called during the p4_initenv step before the p4_global and p4_local structures are fully initialized */ for (i = 0; i < p4_global->num_in_proctable; i++) { if (p4_local->conntab[i].type == CONN_REMOTE_EST) { fd = p4_local->conntab[i].port; FD_SET(fd,&read_fds); if (fd > max_fd) max_fd = fd; } else if (p4_local->conntab[i].type == CONN_REMOTE_DYING) { /* We need to detect that some are down... */ ndown++; /* Now, what to do ? */ } } } /* Now we have found the fds to wait on */ if (q_block) { /* Block forever */ tv_p = 0; } else { /* don't block at all */ tv.tv_sec = 0; tv.tv_usec = 0; tv_p = &tv; } nfds = select(max_fd + 1, &read_fds, &write_fds, 0, tv_p); } while (nfds == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)); if (nfds == -1) { p4_dprintfl(20,"p4_sockets_ready selected on %d\n", write_fd); p4_error("p4_sockets_ready select", nfds); } /* First, check the write fd */ if (FD_ISSET(write_fd,&write_fds)) { return -1; } /* Otherwise, find an fd and make sure that we can really read on it. nfds is the number of available fds */ if (nfds == 0) return -2; for (i = 0; i < p4_global->num_in_proctable; i++) { if (p4_local->conntab[i].type == CONN_REMOTE_EST) { fd = p4_local->conntab[i].port; if (FD_ISSET(fd,&read_fds)) { char tempbuf[2]; int rc; /* see if data is on the socket or merely an eof condition */ /* this should not loop long because the select succeeded */ while ((rc = recv(fd, tempbuf, 1, MSG_PEEK)) == -1) { /* Should collect a count of the times this occurs */ ; } if (rc == 0) /* if eof */ { /* eof; a process has closed its socket; may have died */ for (i = 0; i < p4_global->num_in_proctable; i++) if (p4_local->conntab[i].port == fd) { p4_local->conntab[i].type = CONN_REMOTE_DYING; /* p4_error("tried to read from dead process",-1); */ } } else return fd; } } } return -2;}/* Look for a "close this connection for connection i. This reads only a header if there is any data; since we are closing the connection, any other messages would be an error */void p4_look_for_close( int i ){ struct p4_net_msg_hdr nmsg; int fd, n; fd = p4_local->conntab[i].port; p4_dprintfl( 90, "Looking for close message for conn %d (fd %d)\n", i, fd ); if ( sock_msg_avail_on_fd( fd ) ) { /* Read just a header */ n = net_recv(fd, &nmsg, sizeof(struct p4_net_msg_hdr)); if (p4_n_to_i(nmsg.ack_req) & P4_CLOSE_MASK) { p4_dprintfl(20,"Received looked-for close connection on %d (fd %d)\n", i, fd ); p4_local->conntab[i].type = CONN_REMOTE_CLOSED; } else { p4_dprintfl(90, "Unexpected message seen while closing socket\n" ); } }}/* Wait until a message is available from any source. This includes the listener. Returns one if select found something. */int p4_wait_for_socket_msg( int is_blocking ){ int i, fd, nfds, max_fd; struct timeval tv; fd_set read_fds; int nactive; int timeout_sec = 9; if (!is_blocking) timeout_sec = 0; while (1) { tv.tv_sec = timeout_sec; tv.tv_usec = 0; /* RMB */ FD_ZERO(&read_fds); max_fd = -1;#ifdef THREAD_LISTENER p4_dprintfl(70,"p4_wait_for_socket_msg: p4_local->listener_fd is %d\n", p4_local->listener_fd); FD_SET(p4_local->listener_fd, &read_fds); max_fd = p4_local->listener_fd;#endif nactive = 0; for (i = 0; i < p4_global->num_in_proctable; i++) { if (p4_local->conntab[i].type == CONN_REMOTE_EST) { fd = p4_local->conntab[i].port; FD_SET(fd, &read_fds); if (fd > max_fd) max_fd = fd; nactive++; } } /* If there is only one process, there will NEVER be any active connections. Question: does this cover the case of multiple processes but little communication between them, since the connections are established dynamically? */#ifndef P4_WITH_MPD if (!nactive && p4_global->num_in_proctable > 1) { /* There are no active connections! Let some other routine handle this. */ return 1; }#endif /* Run select; if interrupted, get read_fds (in case a connection has occurred) and restart the connection */ nfds = select(max_fd + 1, &read_fds, 0, 0, &tv); if (is_blocking) timeout_sec = 9; if (nfds == -1 && errno == EINTR) continue; if (nfds) { return 1; } if (!is_blocking) { /* Did not find anything and non-blocking. */ return 0; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -