📄 oob_tcp_peer.c
字号:
if(retval < 0) { if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_send_blocking: send() failed: %s (%d)\n", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), strerror(opal_socket_errno), opal_socket_errno); mca_oob_tcp_peer_close(peer); return -1; } continue; } cnt += retval; } return cnt;}int mca_oob_tcp_peer_send_ident(mca_oob_tcp_peer_t* peer){ mca_oob_tcp_hdr_t hdr; if(peer->peer_state != MCA_OOB_TCP_CONNECTED) return ORTE_SUCCESS; hdr.msg_src = *orte_process_info.my_name; hdr.msg_dst = peer->peer_name; hdr.msg_type = MCA_OOB_TCP_IDENT; hdr.msg_size = 0; hdr.msg_tag = 0; MCA_OOB_TCP_HDR_HTON(&hdr); if(mca_oob_tcp_peer_send_blocking(peer, &hdr, sizeof(hdr)) != sizeof(hdr)) return ORTE_ERR_UNREACH; return ORTE_SUCCESS;}/* static void mca_oob_tcp_peer_recv_ident(mca_oob_tcp_peer_t* peer, mca_oob_tcp_hdr_t* hdr) *//* { *//* OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock); *//* ompi_rb_tree_delete(&mca_oob_tcp_component.tcp_peer_tree, &peer->peer_name); *//* peer->peer_name = hdr->msg_src; *//* ompi_rb_tree_insert(&mca_oob_tcp_component.tcp_peer_tree, &peer->peer_name, peer); *//* OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); *//* } *//* * Dispatch to the appropriate action routine based on the state * of the connection with the peer. */static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user){ mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t *)user; OPAL_THREAD_LOCK(&peer->peer_lock); switch(peer->peer_state) { case MCA_OOB_TCP_CONNECT_ACK: { mca_oob_tcp_peer_recv_connect_ack(peer); break; } case MCA_OOB_TCP_CONNECTED: { /* allocate a new message and setup for recv */ if(NULL == peer->peer_recv_msg) { int rc; mca_oob_tcp_msg_t* msg; MCA_OOB_TCP_MSG_ALLOC(msg, rc); if(NULL == msg) { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_handler: unable to allocate recv message\n", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name))); return; } msg->msg_type = MCA_OOB_TCP_UNEXPECTED; msg->msg_rc = 0; msg->msg_flags = 0; msg->msg_peer = peer->peer_name; msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,2); msg->msg_rwbuf = NULL; msg->msg_rwcnt = msg->msg_rwnum = 1; msg->msg_rwptr = msg->msg_rwiov; msg->msg_rwiov[0].iov_base = (ompi_iov_base_ptr_t)&msg->msg_hdr; msg->msg_rwiov[0].iov_len = sizeof(msg->msg_hdr); peer->peer_recv_msg = msg; } if (peer->peer_recv_msg && mca_oob_tcp_msg_recv_handler(peer->peer_recv_msg, peer)) { mca_oob_tcp_msg_t* msg = peer->peer_recv_msg; peer->peer_recv_msg = NULL; OPAL_THREAD_UNLOCK(&peer->peer_lock); mca_oob_tcp_msg_recv_complete(msg, peer); return; } break; } default: { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_handler: invalid socket state(%d)", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), peer->peer_state); mca_oob_tcp_peer_close(peer); break; } } OPAL_THREAD_UNLOCK(&peer->peer_lock);}/* * A file descriptor is available/ready for send. Check the state * of the socket and take the appropriate action. */static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user){ mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t *)user; OPAL_THREAD_LOCK(&peer->peer_lock); switch(peer->peer_state) { case MCA_OOB_TCP_CONNECTING: mca_oob_tcp_peer_complete_connect(peer); break; case MCA_OOB_TCP_CONNECTED: { while(peer->peer_send_msg != NULL) { /* complete the current send */ mca_oob_tcp_msg_t* msg = peer->peer_send_msg; if(mca_oob_tcp_msg_send_handler(msg, peer)) { mca_oob_tcp_msg_complete(msg, &peer->peer_name); } else { break; } /* if current completed - progress any pending sends */ peer->peer_send_msg = (mca_oob_tcp_msg_t*) opal_list_remove_first(&peer->peer_send_queue); } /* if nothing else to do unregister for send event notifications */ if(NULL == peer->peer_send_msg) { opal_event_del(&peer->peer_send_event); } break; } default: opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_send_handler: invalid connection state (%d)", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), peer->peer_state); opal_event_del(&peer->peer_send_event); break; } OPAL_THREAD_UNLOCK(&peer->peer_lock);} /* * Routine for debugging to print the connection state and socket options */static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg){ char src[64]; char dst[64]; char buff[255]; int sndbuf,rcvbuf,nodelay,flags; struct sockaddr_in inaddr; opal_socklen_t optlen; opal_socklen_t addrlen = sizeof(struct sockaddr_in); getsockname(peer->peer_sd, (struct sockaddr*)&inaddr, &addrlen); sprintf(src, "%s", inet_ntoa(inaddr.sin_addr)); getpeername(peer->peer_sd, (struct sockaddr*)&inaddr, &addrlen); sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr)); if((flags = fcntl(peer->peer_sd, F_GETFL, 0)) < 0) { opal_output(0, "mca_oob_tcp_peer_dump: fcntl(F_GETFL) failed: %s (%d)\n", strerror(opal_socket_errno), opal_socket_errno); } #if defined(SO_SNDBUF) optlen = sizeof(sndbuf); if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &optlen) < 0) { opal_output(0, "mca_oob_tcp_peer_dump: SO_SNDBUF option: %s (%d)\n", strerror(opal_socket_errno), opal_socket_errno); }#else sndbuf = -1;#endif#if defined(SO_RCVBUF) optlen = sizeof(rcvbuf); if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &optlen) < 0) { opal_output(0, "mca_oob_tcp_peer_dump: SO_RCVBUF option: %s (%d)\n", strerror(opal_socket_errno), opal_socket_errno); }#else rcvbuf = -1;#endif#if defined(TCP_NODELAY) optlen = sizeof(nodelay); if(getsockopt(peer->peer_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &optlen) < 0) { opal_output(0, "mca_oob_tcp_peer_dump: TCP_NODELAY option: %s (%d)\n", strerror(opal_socket_errno), opal_socket_errno); }#else nodelay = 0;#endif sprintf(buff, "[%lu,%lu,%lu]-[%lu,%lu,%lu] %s: %s - %s nodelay %d sndbuf %d rcvbuf %d flags %08x\n", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), msg, src, dst, nodelay, sndbuf, rcvbuf, flags); opal_output(0, buff);}/* * Accept incoming connection - if not already connected. We compare the name of the * peer to our own name using the ns.compare_fields function as we want this to be * a LITERAL comparison - i.e., there is no occasion when the peer's name should * be a wildcard value. * * To avoid competing reciprocal connection attempts, we only accept connections from * processes whose names are "greater" than our own. */bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd){ int cmpval; OPAL_THREAD_LOCK(&peer->peer_lock); cmpval = orte_ns.compare_fields(ORTE_NS_CMP_ALL, &peer->peer_name, orte_process_info.my_name); if ((peer->peer_state == MCA_OOB_TCP_CLOSED) || (peer->peer_state == MCA_OOB_TCP_RESOLVE) || (peer->peer_state != MCA_OOB_TCP_CONNECTED && cmpval == ORTE_VALUE1_GREATER)) { if(peer->peer_state != MCA_OOB_TCP_CLOSED) { mca_oob_tcp_peer_close(peer); } peer->peer_sd = sd; mca_oob_tcp_peer_event_init(peer); if(mca_oob_tcp_peer_send_connect_ack(peer) != ORTE_SUCCESS) { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_accept: " "mca_oob_tcp_peer_send_connect_ack failed\n", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name))); mca_oob_tcp_peer_close(peer); OPAL_THREAD_UNLOCK(&peer->peer_lock); return false; } mca_oob_tcp_peer_connected(peer); opal_event_add(&peer->peer_recv_event, 0); if(mca_oob_tcp_component.tcp_debug > 0) { mca_oob_tcp_peer_dump(peer, "accepted"); } OPAL_THREAD_UNLOCK(&peer->peer_lock); return true; } OPAL_THREAD_UNLOCK(&peer->peer_lock); return false;}/* * resolve process name to an actual internet address. */void mca_oob_tcp_peer_resolved(mca_oob_tcp_peer_t* peer, mca_oob_tcp_addr_t* addr){ OPAL_THREAD_LOCK(&peer->peer_lock); peer->peer_addr = addr; if((peer->peer_state == MCA_OOB_TCP_RESOLVE) || (peer->peer_state == MCA_OOB_TCP_CLOSED && opal_list_get_size(&peer->peer_send_queue))) { mca_oob_tcp_peer_start_connect(peer); } OPAL_THREAD_UNLOCK(&peer->peer_lock);}/* * Callback on timeout - retry connection attempt. */static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user){ /* start the connection to the peer */ mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)user; OPAL_THREAD_LOCK(&peer->peer_lock); if(peer->peer_state == MCA_OOB_TCP_CLOSED) mca_oob_tcp_peer_start_connect(peer); OPAL_THREAD_UNLOCK(&peer->peer_lock);}/* * Remove any references to the indicated message. */void mca_oob_tcp_peer_dequeue_msg(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg){ opal_list_item_t* item; OPAL_THREAD_LOCK(&peer->peer_lock); if (peer->peer_send_msg == msg) peer->peer_send_msg = NULL; if (peer->peer_recv_msg == msg) peer->peer_recv_msg = NULL; for( item = opal_list_get_first(&peer->peer_send_queue); item != opal_list_get_end(&peer->peer_send_queue); item = opal_list_get_next(item)) { if(item == (opal_list_item_t*)msg) { opal_list_remove_item(&peer->peer_send_queue, item); break; } } OPAL_THREAD_UNLOCK(&peer->peer_lock);}/** * Set socket buffering */void mca_oob_tcp_set_socket_options(int sd){ int optval;#if defined(TCP_NODELAY) optval = 1; if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)) < 0) { opal_output(0, "[%s:%d] setsockopt(TCP_NODELAY) failed: %s (%d)", __FILE__, __LINE__, strerror(opal_socket_errno), opal_socket_errno); }#endif#if defined(SO_SNDBUF) if(mca_oob_tcp_component.tcp_sndbuf > 0 && setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_oob_tcp_component.tcp_sndbuf, sizeof(int)) < 0) { opal_output(0, "[%s:%d] setsockopt(SO_SNDBUF) failed: %s (%d)", __FILE__, __LINE__, strerror(opal_socket_errno), opal_socket_errno); }#endif#if defined(SO_RCVBUF) if(mca_oob_tcp_component.tcp_rcvbuf > 0 && setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_oob_tcp_component.tcp_rcvbuf, sizeof(int)) < 0) { opal_output(0, "[%s:%d] setsockopt(SO_RCVBUF) failed: %s (%d)", __FILE__, __LINE__, strerror(opal_socket_errno), opal_socket_errno); }#endif}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -