📄 oob_tcp_peer.c
字号:
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2006-2007 Los Alamos National Security, LLC. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ * * In windows, many of the socket functions return an EWOULDBLOCK * instead of \ things like EAGAIN, EINPROGRESS, etc. It has been * verified that this will \ not conflict with other error codes that * are returned by these functions \ under UNIX/Linux environments */#include "orte_config.h"#ifdef HAVE_UNISTD_H#include <unistd.h>#endif#include <fcntl.h>#ifdef HAVE_SYS_UIO_H#include <sys/uio.h>#endif#ifdef HAVE_SYS_TYPES_H#include <sys/types.h>#endif#include "opal/opal_socket_errno.h"#ifdef HAVE_NETINET_IN_H#include <netinet/in.h>#endif#ifdef HAVE_ARPA_INET_H#include <arpa/inet.h>#endif#ifdef HAVE_NETINET_TCP_H#include <netinet/tcp.h>#endif#include "orte/class/orte_proc_table.h"#include "opal/util/output.h"#include "orte/util/univ_info.h"#include "orte/mca/gpr/gpr.h"#include "orte/mca/ns/ns.h"#include "orte/mca/errmgr/errmgr.h"#include "oob_tcp.h"#include "oob_tcp_peer.h"static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer);static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer);static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer);static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer);static void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t* peer);static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer);static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer);static int mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, void* data, size_t size);static int mca_oob_tcp_peer_send_blocking(mca_oob_tcp_peer_t* peer, void* data, size_t size);static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user);static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user);static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user);static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg);OBJ_CLASS_INSTANCE( mca_oob_tcp_peer_t, opal_free_list_item_t, mca_oob_tcp_peer_construct, mca_oob_tcp_peer_destruct);/* * This is the constructor function for the mca_oob_tcp_peer * struct. Note that this function and OBJ_NEW should NEVER * be called directly. Instead, use mca_oob_tcp_add_peer * * @param peer a pointer to the mca_oob_tcp_peer_t struct to be initialized * @retval none */static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer) { OBJ_CONSTRUCT(&(peer->peer_send_queue), opal_list_t); OBJ_CONSTRUCT(&(peer->peer_lock), opal_mutex_t); memset(&peer->peer_send_event, 0, sizeof(peer->peer_send_event)); memset(&peer->peer_recv_event, 0, sizeof(peer->peer_recv_event)); memset(&peer->peer_timer_event, 0, sizeof(peer->peer_timer_event)); opal_evtimer_set(&peer->peer_timer_event, mca_oob_tcp_peer_timer_handler, peer);}/* * This is the destructor function for the mca_oob_tcp_peer * struct. Note that this function and OBJ_RELEASE should NEVER * be called directly. Instead, use mca_oob_tcp_del_peer * * @param peer a pointer to the mca_oob_tcp_peer_t struct to be destroyed * @retval none */static void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t * peer){ mca_oob_tcp_peer_shutdown(peer); OBJ_DESTRUCT(&(peer->peer_send_queue)); OBJ_DESTRUCT(&(peer->peer_lock));}/* * Initialize events to be used by the peer instance for TCP select/poll callbacks. */static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer){ memset(&peer->peer_recv_event, 0, sizeof(peer->peer_recv_event)); memset(&peer->peer_send_event, 0, sizeof(peer->peer_send_event)); opal_event_set( &peer->peer_recv_event, peer->peer_sd, OPAL_EV_READ|OPAL_EV_PERSIST, mca_oob_tcp_peer_recv_handler, peer); opal_event_set( &peer->peer_send_event, peer->peer_sd, OPAL_EV_WRITE|OPAL_EV_PERSIST, mca_oob_tcp_peer_send_handler, peer); return ORTE_SUCCESS;}/* * Initiate the appropriate action based on the state of the connection * to the peer. * */int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg){ int rc = ORTE_SUCCESS; OPAL_THREAD_LOCK(&peer->peer_lock); switch(peer->peer_state) { case MCA_OOB_TCP_CONNECTING: case MCA_OOB_TCP_CONNECT_ACK: case MCA_OOB_TCP_CLOSED: case MCA_OOB_TCP_RESOLVE: /* * queue the message and attempt to resolve the peer address */ opal_list_append(&peer->peer_send_queue, (opal_list_item_t*)msg); if(peer->peer_state == MCA_OOB_TCP_CLOSED) { peer->peer_state = MCA_OOB_TCP_RESOLVE; OPAL_THREAD_UNLOCK(&peer->peer_lock); return mca_oob_tcp_resolve(peer); } break; case MCA_OOB_TCP_FAILED: rc = ORTE_ERR_UNREACH; break; case MCA_OOB_TCP_CONNECTED: /* * start the message and queue if not completed */ if (NULL != peer->peer_send_msg) { opal_list_append(&peer->peer_send_queue, (opal_list_item_t*)msg); } else { /*if the send does not complete */ if(!mca_oob_tcp_msg_send_handler(msg, peer)) { peer->peer_send_msg = msg; opal_event_add(&peer->peer_send_event, 0); } else { mca_oob_tcp_msg_complete(msg, &peer->peer_name); } } break; } OPAL_THREAD_UNLOCK(&peer->peer_lock); return rc;}/* * Lookup a peer by name, create one if it doesn't exist. * @param name Peers globally unique identifier. * @retval Pointer to the newly created struture or NULL on error. */mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const orte_process_name_t* name){ int rc; mca_oob_tcp_peer_t * peer, *old; if (NULL == name) { /* can't look this one up */ return NULL; } OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock); peer = (mca_oob_tcp_peer_t*)orte_hash_table_get_proc( &mca_oob_tcp_component.tcp_peers, name); if(NULL != peer && memcmp(&peer->peer_name,name,sizeof(peer->peer_name)) == 0) { OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); return peer; } /* allocate from free list */ MCA_OOB_TCP_PEER_ALLOC(peer, rc); if(NULL == peer) { OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); return NULL; } /* initialize peer state */ peer->peer_name = *name; peer->peer_addr = NULL; peer->peer_sd = -1; peer->peer_state = MCA_OOB_TCP_CLOSED; peer->peer_recv_msg = NULL; peer->peer_send_msg = NULL; peer->peer_retries = 0; /* add to lookup table */ if(ORTE_SUCCESS != orte_hash_table_set_proc(&mca_oob_tcp_component.tcp_peers, &peer->peer_name, peer)) { MCA_OOB_TCP_PEER_RETURN(peer); OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); return NULL; } /* if the peer list is over the maximum size, remove one unsed peer */ opal_list_prepend(&mca_oob_tcp_component.tcp_peer_list, (opal_list_item_t *) peer); if(mca_oob_tcp_component.tcp_peer_limit > 0 && (int)opal_list_get_size(&mca_oob_tcp_component.tcp_peer_list) > mca_oob_tcp_component.tcp_peer_limit) { old = (mca_oob_tcp_peer_t *) opal_list_get_last(&mca_oob_tcp_component.tcp_peer_list); while(1) { if(0 == opal_list_get_size(&(old->peer_send_queue)) && NULL == peer->peer_recv_msg) { opal_list_remove_item(&mca_oob_tcp_component.tcp_peer_list, (opal_list_item_t *) old); MCA_OOB_TCP_PEER_RETURN(old); break; } else { old = (mca_oob_tcp_peer_t *) opal_list_get_prev(old); if(opal_list_get_begin(&mca_oob_tcp_component.tcp_peer_list) == (opal_list_item_t*)old) { /* we tried, but we couldn't find one that was valid to get rid * of. Oh well. */ break; } } } } OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); return peer;}/* * Try connecting to a peer using all the addresses that peer exported. */static int mca_oob_tcp_peer_try_connect(mca_oob_tcp_peer_t* peer){ struct sockaddr_in inaddr; int rc; do { /* pick an address in round-robin fashion from the list exported by the peer */ if((rc = mca_oob_tcp_addr_get_next(peer->peer_addr, &inaddr)) != ORTE_SUCCESS) { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: " "mca_oob_tcp_addr_get_next failed with error=%d", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), rc); mca_oob_tcp_peer_close(peer); return ORTE_ERR_UNREACH; } if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: " "connecting port %d to: %s:%d\n", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), ntohs(mca_oob_tcp_component.tcp_listen_port), inet_ntoa(inaddr.sin_addr), ntohs(inaddr.sin_port)); } /* start the connect - will likely fail with EINPROGRESS */ if(connect(peer->peer_sd, (struct sockaddr*)&inaddr, sizeof(struct sockaddr_in)) < 0) { /* non-blocking so wait for completion */ if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) { opal_event_add(&peer->peer_send_event, 0); return ORTE_SUCCESS; } opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: " "connect to %s:%d failed: %s (%d)", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), inet_ntoa(inaddr.sin_addr), ntohs(inaddr.sin_port), strerror(opal_socket_errno), opal_socket_errno); continue; } /* send our globally unique process identifier to the peer */ if((rc = mca_oob_tcp_peer_send_connect_ack(peer)) == ORTE_SUCCESS) { peer->peer_state = MCA_OOB_TCP_CONNECT_ACK; opal_event_add(&peer->peer_recv_event, 0); return ORTE_SUCCESS; } else { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: " "mca_oob_tcp_peer_send_connect_ack to %s:%d failed: %s (%d)", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), inet_ntoa(inaddr.sin_addr), ntohs(inaddr.sin_port), rc); } } while(peer->peer_addr->addr_next != 0); /* None of the interfaces worked.. */ opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: " "connect to %s:%d failed, connecting over all interfaces failed!", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), inet_ntoa(inaddr.sin_addr), ntohs(inaddr.sin_port)); mca_oob_tcp_peer_close(peer); return ORTE_ERR_UNREACH;}/* * Start a connection to the peer. This will likely not complete, * as the socket is set to non-blocking, so register for event * notification of connect completion. On connection we send * our globally unique process identifier to the peer and wait for * the peers response. */static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer){ int flags; /* create socket */ peer->peer_state = MCA_OOB_TCP_CONNECTING; peer->peer_sd = socket(AF_INET, SOCK_STREAM, 0); if (peer->peer_sd < 0) { /* if we didn't successfully connect, wait 1 second and then try again */ struct timeval tv = { 1,0 }; opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_start_connect: socket() failed: %s (%d)\n", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), strerror(opal_socket_errno), opal_socket_errno);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -