📄 oob_tcp_peer.c
字号:
mca_oob_tcp_peer_shutdown(peer); opal_evtimer_add(&peer->peer_timer_event, &tv); return ORTE_ERR_UNREACH; } /* setup socket options */ mca_oob_tcp_set_socket_options(peer->peer_sd); /* setup event callbacks */ mca_oob_tcp_peer_event_init(peer); /* setup the socket as non-blocking */ if((flags = fcntl(peer->peer_sd, F_GETFL, 0)) < 0) { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_connect: fcntl(F_GETFL) failed: %s (%d)\n", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), strerror(opal_socket_errno), opal_socket_errno); } else { flags |= O_NONBLOCK; if(fcntl(peer->peer_sd, F_SETFL, flags) < 0) opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_connect: fcntl(F_SETFL) failed: %s (%d)\n", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), strerror(opal_socket_errno), opal_socket_errno); } /* * We should parse all the IP addresses exported by the peer and try to connect to each of them. */ return mca_oob_tcp_peer_try_connect(peer);}/* * Check the status of the connection. If the connection failed, will retry * later. Otherwise, send this processes identifier to the peer on the * newly connected socket. */static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer){ int so_error = 0; opal_socklen_t so_length = sizeof(so_error); /* unregister from receiving event notifications */ opal_event_del(&peer->peer_send_event); /* check connect completion status */ if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_complete_connect: getsockopt() failed: %s (%d)\n", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), strerror(opal_socket_errno), opal_socket_errno); mca_oob_tcp_peer_close(peer); return; } if(so_error == EINPROGRESS) { opal_event_add(&peer->peer_send_event, 0); return; } else if (so_error == ECONNREFUSED || so_error == ETIMEDOUT) { struct timeval tv = { 1,0 }; opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_complete_connect: " "connection failed: %s (%d) - retrying\n", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), strerror(so_error), so_error); mca_oob_tcp_peer_shutdown(peer); opal_evtimer_add(&peer->peer_timer_event, &tv); return; } else if(so_error != 0) { /* No need to worry about the return code here - we return regardless at this point, and if an error did occur a message has already been printed for the user */ mca_oob_tcp_peer_try_connect(peer); return; } if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_complete_connect: " "sending ack, %d", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), so_error); } if(mca_oob_tcp_peer_send_connect_ack(peer) == ORTE_SUCCESS) { peer->peer_state = MCA_OOB_TCP_CONNECT_ACK; opal_event_add(&peer->peer_recv_event, 0); } else { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_complete_connect: unable to send connect ack.", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name))); mca_oob_tcp_peer_close(peer); }}/* * Setup peer state to reflect that connection has been established, * and start any pending sends. */static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer){ opal_event_del(&peer->peer_timer_event); peer->peer_state = MCA_OOB_TCP_CONNECTED; peer->peer_retries = 0; if(opal_list_get_size(&peer->peer_send_queue) > 0) { if(NULL == peer->peer_send_msg) peer->peer_send_msg = (mca_oob_tcp_msg_t*) opal_list_remove_first(&peer->peer_send_queue); opal_event_add(&peer->peer_send_event, 0); }}/* * Remove any event registrations associated with the socket * and update the peer state to reflect the connection has * been closed. */void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer){ if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_close(%p) sd %d state %d\n", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), peer, peer->peer_sd, peer->peer_state); } /* if we lose the connection to the seed - abort */ if(memcmp(&peer->peer_name,ORTE_PROC_MY_HNP,sizeof(orte_process_name_t)) == 0) { /* If we are not already inside orte_finalize, then call abort */ if (ORTE_UNIVERSE_STATE_FINALIZE > orte_universe_info.state) { /* Should free the peer lock before we abort so we don't * get stuck in the orte_wait_kill when receiving messages in the * tcp OOB. */ OPAL_THREAD_UNLOCK(&peer->peer_lock); orte_errmgr.error_detected(1, "OOB: Connection to HNP lost", NULL); } } mca_oob_tcp_peer_shutdown(peer);}void mca_oob_tcp_peer_shutdown(mca_oob_tcp_peer_t* peer){ /* giving up and cleanup any pending messages */ if(peer->peer_retries++ > mca_oob_tcp_component.tcp_peer_retries) { mca_oob_tcp_msg_t *msg; opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_shutdown: retries exceeded", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name))); /* There are cases during the initial connection setup where the peer_send_msg is NULL but there are things in the queue -- handle that case */ if (NULL != (msg = peer->peer_send_msg)) { msg->msg_complete = true; msg->msg_rc = ORTE_ERR_UNREACH; mca_oob_tcp_msg_complete(msg, &peer->peer_name); } peer->peer_send_msg = NULL; while (NULL != (msg = (mca_oob_tcp_msg_t*)opal_list_remove_first(&peer->peer_send_queue))) { msg->msg_complete = true; msg->msg_rc = ORTE_ERR_UNREACH; mca_oob_tcp_msg_complete(msg, &peer->peer_name); } /* We were unsuccessful in establishing a connection, and are not likely to suddenly become successful, so abort the whole thing */ peer->peer_state = MCA_OOB_TCP_FAILED; } if (peer->peer_sd >= 0) { opal_event_del(&peer->peer_recv_event); opal_event_del(&peer->peer_send_event); CLOSE_THE_SOCKET(peer->peer_sd); peer->peer_sd = -1; } opal_event_del(&peer->peer_timer_event); peer->peer_state = MCA_OOB_TCP_CLOSED;}/* * Send the globally unique identifier for this process to a peer on * a newly connected socket. */static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer){ /* send process identifier of self and peer - note that we may * have assigned the peer a unique process name - if it came up * without one. */ mca_oob_tcp_hdr_t hdr; memset(&hdr,0,sizeof(hdr)); if (NULL == orte_process_info.my_name) { /* my name isn't defined yet */ hdr.msg_src = *ORTE_NAME_INVALID; } else { hdr.msg_src = *(orte_process_info.my_name); } hdr.msg_dst = peer->peer_name; hdr.msg_type = MCA_OOB_TCP_CONNECT; MCA_OOB_TCP_HDR_HTON(&hdr); if(mca_oob_tcp_peer_send_blocking(peer, &hdr, sizeof(hdr)) != sizeof(hdr)) { return ORTE_ERR_UNREACH; } return ORTE_SUCCESS;}/* * Receive the peers globally unique process identification from a newly * connected socket and verify the expected response. If so, move the * socket to a connected state. */static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer){ mca_oob_tcp_hdr_t hdr; if((mca_oob_tcp_peer_recv_blocking(peer, &hdr, sizeof(hdr))) != sizeof(hdr)) { /* If the peer state is still CONNECT_ACK, that indicates that the error was a reset from the remote host because the connection was not able to be fully established. In that case, Clean up the connection and give it another go. */ if (peer->peer_state == MCA_OOB_TCP_CONNECT_ACK) { struct timeval tv = { 1,0 }; if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_connect_ack " "connect failed during receive. Restarting (%s).", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), strerror(opal_socket_errno)); } opal_event_del(&peer->peer_recv_event); mca_oob_tcp_peer_shutdown(peer); opal_evtimer_add(&peer->peer_timer_event, &tv); return ORTE_SUCCESS; } else { mca_oob_tcp_peer_close(peer); return ORTE_ERR_UNREACH; } } MCA_OOB_TCP_HDR_NTOH(&hdr); if(hdr.msg_type != MCA_OOB_TCP_CONNECT) { opal_output(0, "mca_oob_tcp_peer_recv_connect_ack: invalid header type: %d\n", hdr.msg_type); mca_oob_tcp_peer_close(peer); return ORTE_ERR_UNREACH; } /* compare the peers name to the expected value */ if(memcmp(&peer->peer_name, &hdr.msg_src, sizeof(orte_process_name_t)) != 0) { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_connect_ack: " "received unexpected process identifier [%d,%d,%d]\n", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), ORTE_NAME_ARGS(&(hdr.msg_src))); mca_oob_tcp_peer_close(peer); return ORTE_ERR_UNREACH; } /* if we have an invalid name or do not have one assigned at all - * use the name returned by the peer. This needs to be a LITERAL * comparison - we do NOT want wildcard values to return EQUAL */ if(orte_process_info.my_name == NULL) { orte_ns.create_process_name(&orte_process_info.my_name, hdr.msg_dst.cellid, hdr.msg_dst.jobid, hdr.msg_dst.vpid); } else if (orte_ns.compare_fields(ORTE_NS_CMP_ALL, orte_process_info.my_name, ORTE_NAME_INVALID) == ORTE_EQUAL) { *orte_process_info.my_name = hdr.msg_dst; } /* connected */ mca_oob_tcp_peer_connected(peer); if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) { mca_oob_tcp_peer_dump(peer, "connected"); } return ORTE_SUCCESS;}/* * A blocking recv on a non-blocking socket. Used to receive the small amount of connection * information that identifies the peers endpoint. */static int mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, void* data, size_t size){ unsigned char* ptr = (unsigned char*)data; size_t cnt = 0; while(cnt < size) { int retval = recv(peer->peer_sd,(char *)ptr+cnt, size-cnt, 0); /* remote closed connection */ if(retval == 0) { if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_INFO) { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_blocking: " "peer closed connection: peer state %d", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), peer->peer_state); } mca_oob_tcp_peer_close(peer); return -1; } /* socket is non-blocking so handle errors */ if(retval < 0) { if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { if (peer->peer_state == MCA_OOB_TCP_CONNECT_ACK) { /* If we overflow the listen backlog, it's possible that even though we finished the three way handshake, the remote host was unable to transition the connection from half connected (received the initial SYN) to fully connected (in the listen backlog). We likely won't see the failure until we try to receive, due to timing and the like. The first thing we'll get in that case is a RST packet, which receive will turn into a connection reset by peer errno. In that case, leave the socket in CONNECT_ACK and propogate the error up to recv_connect_ack, who will try to establish the connection again */ return -1; } else { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_blocking: " "recv() failed: %s (%d)\n", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), strerror(errno), errno); mca_oob_tcp_peer_close(peer); return -1; } } continue; } cnt += retval; } return cnt;}/* * A blocking send on a non-blocking socket. Used to send the small amount of connection * information that identifies the peers endpoint. */static int mca_oob_tcp_peer_send_blocking(mca_oob_tcp_peer_t* peer, void* data, size_t size){ unsigned char* ptr = (unsigned char*)data; size_t cnt = 0; while(cnt < size) { int retval = send(peer->peer_sd, (char *)ptr+cnt, size-cnt, 0);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -