⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 oob_tcp_peer.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 3 页
字号:
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana *                         University Research and Technology *                         Corporation.  All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University *                         of Tennessee Research Foundation.  All rights *                         reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,  *                         University of Stuttgart.  All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. *                         All rights reserved. * Copyright (c) 2006-2007 Los Alamos National Security, LLC.  *                         All rights reserved. * $COPYRIGHT$ *  * Additional copyrights may follow *  * $HEADER$ * * In windows, many of the socket functions return an EWOULDBLOCK * instead of \ things like EAGAIN, EINPROGRESS, etc. It has been * verified that this will \ not conflict with other error codes that * are returned by these functions \ under UNIX/Linux environments  */#include "orte_config.h"#ifdef HAVE_UNISTD_H#include <unistd.h>#endif#include <fcntl.h>#ifdef HAVE_SYS_UIO_H#include <sys/uio.h>#endif#ifdef HAVE_SYS_TYPES_H#include <sys/types.h>#endif#include "opal/opal_socket_errno.h"#ifdef HAVE_NETINET_IN_H#include <netinet/in.h>#endif#ifdef HAVE_ARPA_INET_H#include <arpa/inet.h>#endif#ifdef HAVE_NETINET_TCP_H#include <netinet/tcp.h>#endif#include "orte/class/orte_proc_table.h"#include "opal/util/output.h"#include "orte/util/univ_info.h"#include "orte/mca/gpr/gpr.h"#include "orte/mca/ns/ns.h"#include "orte/mca/errmgr/errmgr.h"#include "oob_tcp.h"#include "oob_tcp_peer.h"static int  mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer);static int  mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer);static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer);static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer);static void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t* peer);static int  mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer);static int  mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer);static int  mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, void* data, size_t size);static int  mca_oob_tcp_peer_send_blocking(mca_oob_tcp_peer_t* peer, void* data, size_t size);static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user);static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user);static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user);static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg);OBJ_CLASS_INSTANCE(    mca_oob_tcp_peer_t,    opal_free_list_item_t,    mca_oob_tcp_peer_construct,    mca_oob_tcp_peer_destruct);/* * This is the constructor function for the mca_oob_tcp_peer * struct. Note that this function and OBJ_NEW should NEVER * be called directly. Instead, use mca_oob_tcp_add_peer * * @param peer a pointer to the mca_oob_tcp_peer_t struct to be initialized * @retval none */static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer) {     OBJ_CONSTRUCT(&(peer->peer_send_queue), opal_list_t);    OBJ_CONSTRUCT(&(peer->peer_lock), opal_mutex_t);    memset(&peer->peer_send_event, 0, sizeof(peer->peer_send_event));    memset(&peer->peer_recv_event, 0, sizeof(peer->peer_recv_event));    memset(&peer->peer_timer_event, 0, sizeof(peer->peer_timer_event));    opal_evtimer_set(&peer->peer_timer_event, mca_oob_tcp_peer_timer_handler, peer);}/* * This is the destructor function for the mca_oob_tcp_peer * struct. Note that this function and OBJ_RELEASE should NEVER * be called directly. Instead, use mca_oob_tcp_del_peer * * @param peer a pointer to the mca_oob_tcp_peer_t struct to be destroyed * @retval none */static void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t * peer){    mca_oob_tcp_peer_shutdown(peer);     OBJ_DESTRUCT(&(peer->peer_send_queue));    OBJ_DESTRUCT(&(peer->peer_lock));}/* * Initialize events to be used by the peer instance for TCP select/poll callbacks. */static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer){    memset(&peer->peer_recv_event, 0, sizeof(peer->peer_recv_event));    memset(&peer->peer_send_event, 0, sizeof(peer->peer_send_event));    opal_event_set(        &peer->peer_recv_event,        peer->peer_sd,        OPAL_EV_READ|OPAL_EV_PERSIST,        mca_oob_tcp_peer_recv_handler,        peer);    opal_event_set(        &peer->peer_send_event,        peer->peer_sd,        OPAL_EV_WRITE|OPAL_EV_PERSIST,        mca_oob_tcp_peer_send_handler,        peer);    return ORTE_SUCCESS;}/* *  Initiate the appropriate action based on the state of the connection *  to the peer. * */int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg){    int rc = ORTE_SUCCESS;    OPAL_THREAD_LOCK(&peer->peer_lock);    switch(peer->peer_state) {    case MCA_OOB_TCP_CONNECTING:    case MCA_OOB_TCP_CONNECT_ACK:    case MCA_OOB_TCP_CLOSED:    case MCA_OOB_TCP_RESOLVE:        /*         * queue the message and attempt to resolve the peer address         */        opal_list_append(&peer->peer_send_queue, (opal_list_item_t*)msg);        if(peer->peer_state == MCA_OOB_TCP_CLOSED) {            peer->peer_state = MCA_OOB_TCP_RESOLVE;            OPAL_THREAD_UNLOCK(&peer->peer_lock);            return mca_oob_tcp_resolve(peer);        }        break;    case MCA_OOB_TCP_FAILED:        rc = ORTE_ERR_UNREACH;        break;    case MCA_OOB_TCP_CONNECTED:        /*         * start the message and queue if not completed          */        if (NULL != peer->peer_send_msg) {            opal_list_append(&peer->peer_send_queue, (opal_list_item_t*)msg);        } else {            /*if the send does not complete */            if(!mca_oob_tcp_msg_send_handler(msg, peer)) {                peer->peer_send_msg = msg;                opal_event_add(&peer->peer_send_event, 0);            } else {                mca_oob_tcp_msg_complete(msg, &peer->peer_name);            }        }        break;    }    OPAL_THREAD_UNLOCK(&peer->peer_lock);    return rc;}/* * Lookup a peer by name, create one if it doesn't exist. * @param name  Peers globally unique identifier. * @retval      Pointer to the newly created struture or NULL on error. */mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const orte_process_name_t* name){    int rc;    mca_oob_tcp_peer_t * peer, *old;    if (NULL == name) { /* can't look this one up */        return NULL;    }        OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);    peer = (mca_oob_tcp_peer_t*)orte_hash_table_get_proc(       &mca_oob_tcp_component.tcp_peers, name);    if(NULL != peer && memcmp(&peer->peer_name,name,sizeof(peer->peer_name)) == 0) {        OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);        return peer;    }    /* allocate from free list */    MCA_OOB_TCP_PEER_ALLOC(peer, rc);    if(NULL == peer) {        OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);        return NULL;    }    /* initialize peer state */    peer->peer_name = *name;    peer->peer_addr = NULL;    peer->peer_sd = -1;    peer->peer_state = MCA_OOB_TCP_CLOSED;    peer->peer_recv_msg = NULL;    peer->peer_send_msg = NULL;    peer->peer_retries = 0;    /* add to lookup table */    if(ORTE_SUCCESS != orte_hash_table_set_proc(&mca_oob_tcp_component.tcp_peers,         &peer->peer_name, peer)) {        MCA_OOB_TCP_PEER_RETURN(peer);        OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);        return NULL;    }    /* if the peer list is over the maximum size, remove one unsed peer */    opal_list_prepend(&mca_oob_tcp_component.tcp_peer_list, (opal_list_item_t *) peer);    if(mca_oob_tcp_component.tcp_peer_limit > 0 &&       (int)opal_list_get_size(&mca_oob_tcp_component.tcp_peer_list) >        mca_oob_tcp_component.tcp_peer_limit) {        old = (mca_oob_tcp_peer_t *)               opal_list_get_last(&mca_oob_tcp_component.tcp_peer_list);        while(1) {            if(0 == opal_list_get_size(&(old->peer_send_queue)) &&               NULL == peer->peer_recv_msg) {                 opal_list_remove_item(&mca_oob_tcp_component.tcp_peer_list,                                       (opal_list_item_t *) old);                MCA_OOB_TCP_PEER_RETURN(old);                break;            } else {                old = (mca_oob_tcp_peer_t *) opal_list_get_prev(old);                if(opal_list_get_begin(&mca_oob_tcp_component.tcp_peer_list) == (opal_list_item_t*)old) {                    /* we tried, but we couldn't find one that was valid to get rid                     * of. Oh well. */                    break;                }            }        }    }    OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);    return peer;}/* * Try connecting to a peer using all the addresses that peer exported. */static int mca_oob_tcp_peer_try_connect(mca_oob_tcp_peer_t* peer){    struct sockaddr_in inaddr;    int rc;    do {        /* pick an address in round-robin fashion from the list exported by the peer */        if((rc = mca_oob_tcp_addr_get_next(peer->peer_addr, &inaddr)) != ORTE_SUCCESS) {            opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: "                        "mca_oob_tcp_addr_get_next failed with error=%d",                        ORTE_NAME_ARGS(orte_process_info.my_name),                        ORTE_NAME_ARGS(&(peer->peer_name)),                        rc);            mca_oob_tcp_peer_close(peer);            return ORTE_ERR_UNREACH;        }        if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {            opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: "                        "connecting port %d to: %s:%d\n",                        ORTE_NAME_ARGS(orte_process_info.my_name),                        ORTE_NAME_ARGS(&(peer->peer_name)),                        ntohs(mca_oob_tcp_component.tcp_listen_port),                        inet_ntoa(inaddr.sin_addr),                        ntohs(inaddr.sin_port));        }                /* start the connect - will likely fail with EINPROGRESS */        if(connect(peer->peer_sd,                (struct sockaddr*)&inaddr, sizeof(struct sockaddr_in)) < 0) {            /* non-blocking so wait for completion */            if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {                opal_event_add(&peer->peer_send_event, 0);                return ORTE_SUCCESS;            }            opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: "                        "connect to %s:%d failed: %s (%d)",                        ORTE_NAME_ARGS(orte_process_info.my_name),                        ORTE_NAME_ARGS(&(peer->peer_name)),                        inet_ntoa(inaddr.sin_addr),                        ntohs(inaddr.sin_port),                        strerror(opal_socket_errno),                        opal_socket_errno);            continue;        }                /* send our globally unique process identifier to the peer */        if((rc = mca_oob_tcp_peer_send_connect_ack(peer)) == ORTE_SUCCESS) {            peer->peer_state = MCA_OOB_TCP_CONNECT_ACK;            opal_event_add(&peer->peer_recv_event, 0);            return ORTE_SUCCESS;        } else {            opal_output(0,                         "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: "                        "mca_oob_tcp_peer_send_connect_ack to %s:%d failed: %s (%d)",                        ORTE_NAME_ARGS(orte_process_info.my_name),                        ORTE_NAME_ARGS(&(peer->peer_name)),                        inet_ntoa(inaddr.sin_addr),                        ntohs(inaddr.sin_port),                        rc);        }    } while(peer->peer_addr->addr_next != 0);    /* None of the interfaces worked.. */    opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: "                "connect to %s:%d failed, connecting over all interfaces failed!",                ORTE_NAME_ARGS(orte_process_info.my_name),                ORTE_NAME_ARGS(&(peer->peer_name)),                inet_ntoa(inaddr.sin_addr),                ntohs(inaddr.sin_port));    mca_oob_tcp_peer_close(peer);    return ORTE_ERR_UNREACH;}/* *  Start a connection to the peer. This will likely not complete, *  as the socket is set to non-blocking, so register for event *  notification of connect completion. On connection we send *  our globally unique process identifier to the peer and wait for *  the peers response. */static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer){    int flags;    /* create socket */    peer->peer_state = MCA_OOB_TCP_CONNECTING;    peer->peer_sd = socket(AF_INET, SOCK_STREAM, 0);    if (peer->peer_sd < 0) {        /* if we didn't successfully connect, wait 1 second and then try again */        struct timeval tv = { 1,0 };        opal_output(0,             "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_start_connect: socket() failed: %s (%d)\n",            ORTE_NAME_ARGS(orte_process_info.my_name),            ORTE_NAME_ARGS(&(peer->peer_name)),            strerror(opal_socket_errno),            opal_socket_errno);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -