⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 oob_tcp_peer.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 3 页
字号:
        if(retval < 0) {            if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {                opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_send_blocking: send() failed: %s (%d)\n",                    ORTE_NAME_ARGS(orte_process_info.my_name),                    ORTE_NAME_ARGS(&(peer->peer_name)),                    strerror(opal_socket_errno),                    opal_socket_errno);                mca_oob_tcp_peer_close(peer);                return -1;            }            continue;        }        cnt += retval;    }    return cnt;}int mca_oob_tcp_peer_send_ident(mca_oob_tcp_peer_t* peer){    mca_oob_tcp_hdr_t hdr;    if(peer->peer_state != MCA_OOB_TCP_CONNECTED)        return ORTE_SUCCESS;    hdr.msg_src = *orte_process_info.my_name;    hdr.msg_dst = peer->peer_name;    hdr.msg_type = MCA_OOB_TCP_IDENT;    hdr.msg_size = 0;    hdr.msg_tag = 0;    MCA_OOB_TCP_HDR_HTON(&hdr);    if(mca_oob_tcp_peer_send_blocking(peer, &hdr, sizeof(hdr)) != sizeof(hdr))        return ORTE_ERR_UNREACH;    return ORTE_SUCCESS;}/* static void mca_oob_tcp_peer_recv_ident(mca_oob_tcp_peer_t* peer, mca_oob_tcp_hdr_t* hdr) *//* { *//*     OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock); *//*     ompi_rb_tree_delete(&mca_oob_tcp_component.tcp_peer_tree, &peer->peer_name); *//*     peer->peer_name = hdr->msg_src; *//*     ompi_rb_tree_insert(&mca_oob_tcp_component.tcp_peer_tree, &peer->peer_name, peer); *//*     OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); *//* } *//* * Dispatch to the appropriate action routine based on the state * of the connection with the peer. */static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user){    mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t *)user;    OPAL_THREAD_LOCK(&peer->peer_lock);    switch(peer->peer_state) {        case MCA_OOB_TCP_CONNECT_ACK:         {            mca_oob_tcp_peer_recv_connect_ack(peer);            break;        }        case MCA_OOB_TCP_CONNECTED:         {            /* allocate a new message and setup for recv */            if(NULL == peer->peer_recv_msg) {                int rc;                mca_oob_tcp_msg_t* msg;                MCA_OOB_TCP_MSG_ALLOC(msg, rc);                if(NULL == msg) {                    opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_handler: unable to allocate recv message\n",                        ORTE_NAME_ARGS(orte_process_info.my_name),                        ORTE_NAME_ARGS(&(peer->peer_name)));                    return;                }                msg->msg_type = MCA_OOB_TCP_UNEXPECTED;                msg->msg_rc = 0;                msg->msg_flags = 0;                msg->msg_peer = peer->peer_name;                msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,2);                msg->msg_rwbuf = NULL;                msg->msg_rwcnt = msg->msg_rwnum = 1;                msg->msg_rwptr = msg->msg_rwiov;                msg->msg_rwiov[0].iov_base = (ompi_iov_base_ptr_t)&msg->msg_hdr;                msg->msg_rwiov[0].iov_len = sizeof(msg->msg_hdr);                peer->peer_recv_msg = msg;            }            if (peer->peer_recv_msg &&                 mca_oob_tcp_msg_recv_handler(peer->peer_recv_msg, peer)) {               mca_oob_tcp_msg_t* msg = peer->peer_recv_msg;               peer->peer_recv_msg = NULL;               OPAL_THREAD_UNLOCK(&peer->peer_lock);               mca_oob_tcp_msg_recv_complete(msg, peer);               return;            }            break;        }        default:         {            opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_handler: invalid socket state(%d)",                     ORTE_NAME_ARGS(orte_process_info.my_name),                    ORTE_NAME_ARGS(&(peer->peer_name)),                    peer->peer_state);            mca_oob_tcp_peer_close(peer);            break;        }    }    OPAL_THREAD_UNLOCK(&peer->peer_lock);}/* * A file descriptor is available/ready for send. Check the state * of the socket and take the appropriate action. */static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user){    mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t *)user;    OPAL_THREAD_LOCK(&peer->peer_lock);    switch(peer->peer_state) {    case MCA_OOB_TCP_CONNECTING:        mca_oob_tcp_peer_complete_connect(peer);        break;    case MCA_OOB_TCP_CONNECTED:        {        while(peer->peer_send_msg != NULL) {            /* complete the current send */            mca_oob_tcp_msg_t* msg = peer->peer_send_msg;            if(mca_oob_tcp_msg_send_handler(msg, peer)) {                mca_oob_tcp_msg_complete(msg, &peer->peer_name);            } else {                break;            }            /* if current completed - progress any pending sends */            peer->peer_send_msg = (mca_oob_tcp_msg_t*)                opal_list_remove_first(&peer->peer_send_queue);        }                /* if nothing else to do unregister for send event notifications */        if(NULL == peer->peer_send_msg) {            opal_event_del(&peer->peer_send_event);        }        break;        }    default:        opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_send_handler: invalid connection state (%d)",            ORTE_NAME_ARGS(orte_process_info.my_name),            ORTE_NAME_ARGS(&(peer->peer_name)),            peer->peer_state);        opal_event_del(&peer->peer_send_event);        break;    }    OPAL_THREAD_UNLOCK(&peer->peer_lock);}                                                                                                                       /* * Routine for debugging to print the connection state and socket options */static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg){    char src[64];    char dst[64];    char buff[255];    int sndbuf,rcvbuf,nodelay,flags;    struct sockaddr_in inaddr;    opal_socklen_t optlen;    opal_socklen_t addrlen = sizeof(struct sockaddr_in);                                                                                                                getsockname(peer->peer_sd, (struct sockaddr*)&inaddr, &addrlen);    sprintf(src, "%s", inet_ntoa(inaddr.sin_addr));    getpeername(peer->peer_sd, (struct sockaddr*)&inaddr, &addrlen);    sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr));                                                                                                                if((flags = fcntl(peer->peer_sd, F_GETFL, 0)) < 0) {        opal_output(0, "mca_oob_tcp_peer_dump: fcntl(F_GETFL) failed: %s (%d)\n",                    strerror(opal_socket_errno),                    opal_socket_errno);    }                                                                                                            #if defined(SO_SNDBUF)    optlen = sizeof(sndbuf);    if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &optlen) < 0) {        opal_output(0, "mca_oob_tcp_peer_dump: SO_SNDBUF option: %s (%d)\n",                     strerror(opal_socket_errno),                    opal_socket_errno);    }#else    sndbuf = -1;#endif#if defined(SO_RCVBUF)    optlen = sizeof(rcvbuf);    if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &optlen) < 0) {        opal_output(0, "mca_oob_tcp_peer_dump: SO_RCVBUF option: %s (%d)\n",                     strerror(opal_socket_errno),                    opal_socket_errno);    }#else    rcvbuf = -1;#endif#if defined(TCP_NODELAY)    optlen = sizeof(nodelay);    if(getsockopt(peer->peer_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &optlen) < 0) {        opal_output(0, "mca_oob_tcp_peer_dump: TCP_NODELAY option: %s (%d)\n",                     strerror(opal_socket_errno),                    opal_socket_errno);    }#else    nodelay = 0;#endif    sprintf(buff, "[%lu,%lu,%lu]-[%lu,%lu,%lu] %s: %s - %s nodelay %d sndbuf %d rcvbuf %d flags %08x\n",        ORTE_NAME_ARGS(orte_process_info.my_name),        ORTE_NAME_ARGS(&(peer->peer_name)),        msg, src, dst, nodelay, sndbuf, rcvbuf, flags);    opal_output(0, buff);}/* * Accept incoming connection - if not already connected. We compare the name of the * peer to our own name using the ns.compare_fields function as we want this to be * a LITERAL comparison - i.e., there is no occasion when the peer's name should * be a wildcard value. * * To avoid competing reciprocal connection attempts, we only accept connections from * processes whose names are "greater" than our own. */bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd){    int cmpval;    OPAL_THREAD_LOCK(&peer->peer_lock);    cmpval = orte_ns.compare_fields(ORTE_NS_CMP_ALL, &peer->peer_name, orte_process_info.my_name);    if ((peer->peer_state == MCA_OOB_TCP_CLOSED) ||        (peer->peer_state == MCA_OOB_TCP_RESOLVE) ||        (peer->peer_state != MCA_OOB_TCP_CONNECTED &&         cmpval == ORTE_VALUE1_GREATER)) {        if(peer->peer_state != MCA_OOB_TCP_CLOSED) {            mca_oob_tcp_peer_close(peer);        }        peer->peer_sd = sd;        mca_oob_tcp_peer_event_init(peer);        if(mca_oob_tcp_peer_send_connect_ack(peer) != ORTE_SUCCESS) {            opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_accept: "                "mca_oob_tcp_peer_send_connect_ack failed\n",                ORTE_NAME_ARGS(orte_process_info.my_name),                ORTE_NAME_ARGS(&(peer->peer_name)));            mca_oob_tcp_peer_close(peer);            OPAL_THREAD_UNLOCK(&peer->peer_lock);            return false;        }        mca_oob_tcp_peer_connected(peer);        opal_event_add(&peer->peer_recv_event, 0);        if(mca_oob_tcp_component.tcp_debug > 0) {            mca_oob_tcp_peer_dump(peer, "accepted");        }        OPAL_THREAD_UNLOCK(&peer->peer_lock);        return true;    }    OPAL_THREAD_UNLOCK(&peer->peer_lock);    return false;}/* * resolve process name to an actual internet address. */void mca_oob_tcp_peer_resolved(mca_oob_tcp_peer_t* peer, mca_oob_tcp_addr_t* addr){    OPAL_THREAD_LOCK(&peer->peer_lock);    peer->peer_addr = addr;    if((peer->peer_state == MCA_OOB_TCP_RESOLVE) ||       (peer->peer_state == MCA_OOB_TCP_CLOSED && opal_list_get_size(&peer->peer_send_queue))) {        mca_oob_tcp_peer_start_connect(peer);    }    OPAL_THREAD_UNLOCK(&peer->peer_lock);}/* * Callback on timeout - retry connection attempt. */static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user){    /* start the connection to the peer */    mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)user;    OPAL_THREAD_LOCK(&peer->peer_lock);    if(peer->peer_state == MCA_OOB_TCP_CLOSED)        mca_oob_tcp_peer_start_connect(peer);    OPAL_THREAD_UNLOCK(&peer->peer_lock);}/* * Remove any references to the indicated message. */void mca_oob_tcp_peer_dequeue_msg(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg){    opal_list_item_t* item;    OPAL_THREAD_LOCK(&peer->peer_lock);    if (peer->peer_send_msg == msg)        peer->peer_send_msg = NULL;    if (peer->peer_recv_msg == msg)        peer->peer_recv_msg = NULL;    for( item =  opal_list_get_first(&peer->peer_send_queue);         item != opal_list_get_end(&peer->peer_send_queue);         item = opal_list_get_next(item)) {        if(item == (opal_list_item_t*)msg) {            opal_list_remove_item(&peer->peer_send_queue, item);            break;        }    }    OPAL_THREAD_UNLOCK(&peer->peer_lock);}/** * Set socket buffering */void mca_oob_tcp_set_socket_options(int sd){    int optval;#if defined(TCP_NODELAY)    optval = 1;    if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)) < 0) {        opal_output(0, "[%s:%d] setsockopt(TCP_NODELAY) failed: %s (%d)",                     __FILE__, __LINE__,                     strerror(opal_socket_errno),                    opal_socket_errno);    }#endif#if defined(SO_SNDBUF)    if(mca_oob_tcp_component.tcp_sndbuf > 0 &&       setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_oob_tcp_component.tcp_sndbuf, sizeof(int)) < 0) {        opal_output(0, "[%s:%d] setsockopt(SO_SNDBUF) failed: %s (%d)",                     __FILE__, __LINE__,                     strerror(opal_socket_errno),                    opal_socket_errno);    }#endif#if defined(SO_RCVBUF)    if(mca_oob_tcp_component.tcp_rcvbuf > 0 &&       setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_oob_tcp_component.tcp_rcvbuf, sizeof(int)) < 0) {        opal_output(0, "[%s:%d] setsockopt(SO_RCVBUF) failed: %s (%d)",                     __FILE__, __LINE__,                     strerror(opal_socket_errno),                    opal_socket_errno);    }#endif}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -