📄 p4_sock_sr.c
字号:
#include "p4.h"#include "p4_sys.h"/* Look for u_int xdr_len; some systems define as u_int, others as int */#ifdef CAN_DO_XDRint xdr_send(type, from, to, msg, len, data_type, ack_req)char *msg;int type, from, to, len, data_type, ack_req;{ int nbytes_written = 0; int flag, fd, myid; struct p4_net_msg_hdr nmsg; XDR *xdr_enc; xdrproc_t xdr_proc; char *xdr_buff; int xdr_elsize, els_per_buf, xdr_numels;/* Begin bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ int elsize; /* End bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ int xdr_len1, len_bytes;/* See new test (should only need the USE_U_INT_FOR_XDR). Also, other int args to xdr_array should also be u_int */#if defined(SUN_SOLARIS) || defined(CRAY) || defined(SGI) || \ defined(USE_U_INT_FOR_XDR) u_int xdr_len;#elif defined(USE_UNSIGNED_INT_FOR_XDR) unsigned int xdr_len;#else int xdr_len;#endif p4_dprintfl(20, "sending msg of type %d from %d to %d via xdr\n", type,from,to); myid = p4_get_my_id(); fd = p4_local->conntab[to].port; nmsg.msg_type = p4_i_to_n(type); nmsg.to = p4_i_to_n(to); nmsg.from = p4_i_to_n(from); nmsg.imm_from = p4_i_to_n(p4_local->my_id); p4_dprintfl(30,"setting imm_from: to = %d, from = %d, imm_from = %d, p4_i_to_n(imm_from) =%d in xdr_send\n", to, from, p4_local->my_id, p4_i_to_n(p4_local->my_id)); switch (data_type) { case P4INT: xdr_proc = (xdrproc_t) xdr_int; xdr_elsize = XDR_INT_LEN;/* Begin bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ elsize = sizeof(int);/* End bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ break; case P4LNG: xdr_proc = (xdrproc_t) xdr_long; xdr_elsize = XDR_LNG_LEN;/* Begin bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ elsize = sizeof(long);/* End bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ break; case P4FLT: xdr_proc = (xdrproc_t) xdr_float; xdr_elsize = XDR_FLT_LEN;/* Begin bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ elsize = sizeof(float);/* End bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ break; case P4DBL: xdr_proc = (xdrproc_t) xdr_double; xdr_elsize = XDR_DBL_LEN;/* Begin bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ elsize = sizeof(double);/* End bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ break; default: p4_dprintf("xdr_send: invalid data type %d\n", data_type); return (-1); }/* Begin bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ xdr_numels = len / elsize /*instead of xdr_elsize*/;/* End bugfix, compute xdr_numels correct, Rolf Rabenseifner,04SEP97*/ nmsg.msg_len = p4_i_to_n(xdr_numels); nmsg.ack_req = p4_i_to_n(ack_req); nmsg.data_type = p4_i_to_n(data_type); flag = (myid < to) ? P4_TRUE : P4_FALSE; net_send(fd, &nmsg, sizeof(struct p4_net_msg_hdr), flag); xdr_enc = &(p4_local->xdr_enc); xdr_buff = p4_local->xdr_buff; els_per_buf = (XDR_BUFF_LEN - XDR_PAD) / xdr_elsize; while (xdr_numels > 0) { if (xdr_numels > els_per_buf) xdr_len = els_per_buf; else xdr_len = xdr_numels; xdr_len1 = xdr_len; /* remember xdr_len */ if (!xdr_setpos(xdr_enc, 0)) { p4_dprintf("xdr_send: xdr_setpos failed\n"); return (-1); } if (!xdr_array(xdr_enc, &msg, &xdr_len, XDR_BUFF_LEN, xdr_elsize, xdr_proc)) { p4_dprintf("xdr_send: xdr_array failed\n"); return (-1); } len_bytes = xdr_getpos(xdr_enc); net_send(fd, xdr_buff, len_bytes, flag); nbytes_written += len_bytes; xdr_numels -= xdr_len1; msg = msg + len_bytes - XDR_PAD; } if (ack_req & P4_ACK_REQ_MASK) { wait_for_ack(fd); } p4_dprintfl(10, "sent msg of type %d from %d to %d via xdr\n",type,from,to); return (nbytes_written);}#endifint socket_send(type, from, to, msg, len, data_type, ack_req)int type, from, to, len, data_type, ack_req;char *msg;{ int fd, flag; int sent = 0; int n; struct p4_net_msg_hdr nmsg; p4_dprintfl(20, "sending msg of type %d from %d to %d via socket\n",type,from,to); if (CHECKNODE(to) || CHECKNODE(from)) p4_error("socket_send: bad header: to/from node is out of range", to * 10000 + from); fd = p4_local->conntab[to].port; nmsg.msg_type = p4_i_to_n(type); nmsg.to = p4_i_to_n(to); nmsg.from = p4_i_to_n(from); nmsg.imm_from = p4_i_to_n(p4_local->my_id); nmsg.msg_len = p4_i_to_n(len); nmsg.ack_req = p4_i_to_n(ack_req); nmsg.data_type = p4_i_to_n(data_type); p4_dprintfl(30,"setting imm_from: to = %d, from = %d, imm_from = %d, p4_i_to_n(imm_from) =%d in socket_send\n", to, from, p4_local->my_id, p4_i_to_n(p4_local->my_id)); flag = (from < to) ? P4_TRUE : P4_FALSE; n = net_send2( fd, &nmsg, sizeof(struct p4_net_msg_hdr), msg, len, flag ); sent += n; if (ack_req & P4_ACK_REQ_MASK) { wait_for_ack(fd); } p4_dprintfl(10, "sent msg of type %d from %d to %d via socket %d\n",type,from,to,fd); return (sent);}/* Send a message to close a socket connection. Note that the partner may also have closed the socket; in that case, the write will fail but because we have set p4_local->in_wait_for_exit, no error message or action will occur */int socket_close_conn( int fd ){ struct p4_net_msg_hdr nmsg; p4_dprintfl( 10, "Closing socket on fd %d\n", fd ); p4_dprintfl( 40, "Sending close socket message\n" ); /* Most of this is ignored */ nmsg.msg_type = p4_i_to_n(0); nmsg.to = p4_i_to_n(0); /* The from fields may be tested, and it is useful to have them anyway */ nmsg.from = p4_i_to_n(p4_local->my_id); nmsg.imm_from = p4_i_to_n(p4_local->my_id); nmsg.msg_len = p4_i_to_n(0); nmsg.ack_req = p4_i_to_n(P4_CLOSE_MASK); nmsg.data_type = p4_i_to_n(0); /* This may fail if our partner has already closed the socket. In that case, we don't care. */ net_send(fd, &nmsg, sizeof(struct p4_net_msg_hdr), P4_FALSE ); /* Instead of close, consider using shutdown( fd, SHUT_WR ) if we want to allow the other side to send us data (e.g., for a clean handshake on the close connection) */ close( fd ); p4_dprintfl( 40, "Socket on fd %d closed\n", fd ); return 0;}/* This code introduces some subtle problems. The timeout on the select is needed to catch changes in the established connections, but in this case, we need EINTR (interrupted system call) from the select to just restart the call AFTER we've recomputed the read_fds. */struct p4_msg *socket_recv( int is_blocking ){#ifdef THREAD_LISTENER struct slave_listener_msg msg;#endif int i, fd, nfds, max_fd; struct p4_msg *tmsg = NULL; P4BOOL found = P4_FALSE; struct timeval tv; fd_set read_fds; int nactive; int found_cmd = 0; int timeout_sec = 9; /* If timeout_sec is not set to zero in the non-blocking case, the -comm=shared case can cause *huge* delays because this call should be polled but would otherwise block for 9 seconds */ if (!is_blocking) timeout_sec = 0; while (!found) { tv.tv_sec = timeout_sec; tv.tv_usec = 0; /* RMB */ FD_ZERO(&read_fds); max_fd = -1;#ifdef THREAD_LISTENER p4_dprintfl(70,"socket_recv: p4_local->listener_fd is %d\n", p4_local->listener_fd); FD_SET(p4_local->listener_fd, &read_fds); max_fd = p4_local->listener_fd;#endif nactive = 0; for (i = 0; !tmsg && i < p4_global->num_in_proctable; i++) { if (p4_local->conntab[i].type == CONN_REMOTE_EST) { fd = p4_local->conntab[i].port; FD_SET(fd, &read_fds); if (fd > max_fd) max_fd = fd; nactive++; } } /* If there is only one process, there will NEVER be any active connections. Question: does this cover the case of multiple processes but little communication between them, since the connections are established dynamically? */#ifndef P4_WITH_MPD if (!nactive && p4_global->num_in_proctable > 1) { /* If we read a "close" and there are no connections left, silently exit */ if (found_cmd) return 0; /* There are no active connections! If this is because the active connections have all died, then we should exit. Question: what if one connection has died "irregularly"? */ p4_dprintf("Trying to receive a message when there are no connections; Bailing out\n"); p4_wait_for_end(); exit(0); }#endif /* Run select; if interrupted, get read_fds (in case a connection has occurred) and restart the connection */ nfds = select(max_fd + 1, &read_fds, 0, 0, &tv); if (is_blocking) timeout_sec = 9; if (nfds == -1 && errno == EINTR) continue; if (nfds) {#ifdef THREAD_LISTENER if (FD_ISSET(p4_local->listener_fd,&read_fds)) { /* receive dummy msg */ net_recv(p4_local->listener_fd, &msg, sizeof(msg)); p4_dprintfl(70,"socket_recv: got dummy msg\n"); continue; }#endif for (i = 0; !tmsg && i < p4_global->num_in_proctable; i++) { if (p4_local->conntab[i].type == CONN_REMOTE_EST) { fd = p4_local->conntab[i].port; /* sock_msg_avail does *another* select and then a recv MSG_PEEK to make sure that there is really data. The net_recv in socket_recv_on_fd should do be sufficient; if not, the recv(MSG_PEAK) should be used, not sock_msg_avail_on_fd */ if (FD_ISSET(fd,&read_fds) /*&& sock_msg_avail_on_fd(fd)*/) { tmsg = socket_recv_on_fd(fd); found = P4_TRUE; if (tmsg->ack_req & P4_ACK_REQ_MASK) { send_ack(fd, tmsg->from); } if (tmsg->ack_req & P4_CLOSE_MASK) { p4_dprintfl(20,"Received close connection on %d (fd %d)\n", i, fd ); p4_local->conntab[i].type = CONN_REMOTE_CLOSED; /* Discard the message */ free_p4_msg( tmsg ); tmsg = 0; found = P4_FALSE; /* Remember that we found a command (see code above for no connections) */ found_cmd = P4_TRUE; /* Note that if we called this because we found a message available, we may want to return a "no more messages" without doing a long wait. */ timeout_sec = 0; } } } } } else if (found_cmd && !is_blocking) break; } return (tmsg);}struct p4_msg *socket_recv_on_fd( int fd ){ int n, data_type, msg_len = -1; struct p4_msg *tmsg; struct p4_net_msg_hdr nmsg; n = net_recv(fd, &nmsg, sizeof(struct p4_net_msg_hdr)); data_type = p4_n_to_i(nmsg.data_type); if (data_type == P4NOX) msg_len = p4_n_to_i(nmsg.msg_len); else { switch (data_type) {/* Begin bugfix, compute msg_len correct, Rolf Rabenseifner,04SEP97*/ case P4INT: msg_len = p4_n_to_i(nmsg.msg_len) * sizeof(int); /* instead of XDR_INT_LEN*/ break; case P4LNG: msg_len = p4_n_to_i(nmsg.msg_len) * sizeof(long); /* instead of XDR_LNG_LEN*/ break; case P4FLT: msg_len = p4_n_to_i(nmsg.msg_len) * sizeof(float); /* instead of XDR_FLT_LEN*/ break; case P4DBL: msg_len = p4_n_to_i(nmsg.msg_len) * sizeof(double); /* instead of XDR_DBL_LEN*/ break;/* End bugfix, compute msg_len correct, Rolf Rabenseifner,04SEP97*/ default: p4_error("socket_recv_on_fd: invalid data type =", data_type); } } if (msg_len < 0) p4_error("socket_recv_on_fd: failed to set msg_len = ", msg_len); tmsg = alloc_p4_msg(msg_len); tmsg->type = p4_n_to_i(nmsg.msg_type); tmsg->to = p4_n_to_i(nmsg.to); tmsg->from = p4_n_to_i(nmsg.from); tmsg->len = p4_n_to_i(nmsg.msg_len); /* chgd by xdr_recv below */ tmsg->data_type = p4_n_to_i(nmsg.data_type); tmsg->ack_req = p4_n_to_i(nmsg.ack_req); p4_dprintfl(30,"recving imm_from: to = %d, from = %d, imm_from = %d, p4_n_to_i(imm_from) =%d in sock_recv_of_fd\n", tmsg->to, tmsg->from, nmsg.imm_from, p4_n_to_i(nmsg.imm_from)); p4_dprintfl(30,"data_type = %d, same_rep = %d\n", tmsg->data_type, p4_local->conntab[p4_n_to_i(nmsg.imm_from)].same_data_rep); if (tmsg->data_type == P4NOX || p4_local->conntab[p4_n_to_i(nmsg.imm_from)].same_data_rep) { n = net_recv(fd, (char *) &(tmsg->msg), tmsg->len); } else {# ifdef CAN_DO_XDR n = xdr_recv(fd, tmsg);# else p4_error("cannot do xdr recvs\n",0);# endif } return (tmsg);}/* This routine is scalable but the implementation isn't. See p4_sockets_ready */P4BOOL socket_msgs_available( void ){ int i, fd; int ndown = 0; for (i = 0; i < p4_global->num_in_proctable; i++) { if (p4_local->conntab[i].type == CONN_REMOTE_EST) { fd = p4_local->conntab[i].port; if (sock_msg_avail_on_fd(fd)) { return (P4_TRUE); } } else if (p4_local->conntab[i].type == CONN_REMOTE_DYING) { /* We need to detect that some are down... */ ndown++;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -