⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 oob_tcp.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 4 页
字号:
        int sd;        sd = accept(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&addr, &addrlen);        if(sd < 0) {            if(opal_socket_errno == EINTR)                continue;            if(opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK)                opal_output(0, "mca_oob_tcp_accept: accept() failed: %s (%d).",                             strerror(opal_socket_errno), opal_socket_errno);            return;        }        /* setup socket options */        mca_oob_tcp_set_socket_options(sd);        /* log the accept */        if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {            opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_accept: %s:%d\n",                        ORTE_NAME_ARGS(orte_process_info.my_name),                        inet_ntoa(addr.sin_addr),                        addr.sin_port);        }        /* wait for receipt of peers process identifier to complete this connection */        event = OBJ_NEW(mca_oob_tcp_event_t);        opal_event_set(&event->event, sd, OPAL_EV_READ, mca_oob_tcp_recv_handler, event);        opal_event_add(&event->event, 0);    }}/* * Create a listen socket and bind to all interfaces */static int mca_oob_tcp_create_listen(void){    int flags;    struct sockaddr_in inaddr;    opal_socklen_t addrlen;    /* create a listen socket for incoming connections */    mca_oob_tcp_component.tcp_listen_sd = socket(AF_INET, SOCK_STREAM, 0);    if(mca_oob_tcp_component.tcp_listen_sd < 0) {        opal_output(0,"mca_oob_tcp_component_init: socket() failed: %s (%d)",                     strerror(opal_socket_errno), opal_socket_errno);        return ORTE_ERROR;    }    /* setup socket options */    mca_oob_tcp_set_socket_options(mca_oob_tcp_component.tcp_listen_sd);    /* bind address */    memset(&inaddr, 0, sizeof(inaddr));    inaddr.sin_family = AF_INET;    inaddr.sin_addr.s_addr = INADDR_ANY;    inaddr.sin_port = 0;    if(bind(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) {        opal_output(0,"mca_oob_tcp_create_listen: bind() failed: %s (%d)",                     strerror(opal_socket_errno), opal_socket_errno);        return ORTE_ERROR;    }    /* resolve system assigned port */    addrlen = sizeof(struct sockaddr_in);    if(getsockname(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {        opal_output(0, "mca_oob_tcp_create_listen: getsockname(): %s (%d)",                     strerror(opal_socket_errno), opal_socket_errno);        return ORTE_ERROR;    }    mca_oob_tcp_component.tcp_listen_port = inaddr.sin_port;    /* setup listen backlog to maximum allowed by kernel */    if(listen(mca_oob_tcp_component.tcp_listen_sd, SOMAXCONN) < 0) {        opal_output(0, "mca_oob_tcp_component_init: listen(): %s (%d)",                     strerror(opal_socket_errno), opal_socket_errno);        return ORTE_ERROR;    }    /* set socket up to be non-blocking, otherwise accept could block */    if((flags = fcntl(mca_oob_tcp_component.tcp_listen_sd, F_GETFL, 0)) < 0) {        opal_output(0, "mca_oob_tcp_component_init: fcntl(F_GETFL) failed: %s (%d)",                     strerror(opal_socket_errno), opal_socket_errno);        return ORTE_ERROR;    } else {        flags |= O_NONBLOCK;        if(fcntl(mca_oob_tcp_component.tcp_listen_sd, F_SETFL, flags) < 0) {            opal_output(0, "mca_oob_tcp_component_init: fcntl(F_SETFL) failed: %s (%d)",                         strerror(opal_socket_errno), opal_socket_errno);            return ORTE_ERROR;        }    }    /* register listen port */    opal_event_set(        &mca_oob_tcp_component.tcp_recv_event,        mca_oob_tcp_component.tcp_listen_sd,        OPAL_EV_READ|OPAL_EV_PERSIST,        mca_oob_tcp_recv_handler,        0);    opal_event_add(&mca_oob_tcp_component.tcp_recv_event, 0);    return ORTE_SUCCESS;}static void* mca_oob_tcp_listen_thread(opal_object_t *obj){    int rc, count;    opal_socklen_t addrlen = sizeof(struct sockaddr_in);    opal_free_list_item_t *fl_item;    mca_oob_tcp_pending_connection_t *item;    struct timeval timeout;    fd_set readfds;    while (false == mca_oob_tcp_component.tcp_shutdown) {        count = 0;        FD_ZERO(&readfds);        FD_SET(mca_oob_tcp_component.tcp_listen_sd, &readfds);        timeout.tv_sec = 0;        timeout.tv_usec = 10000;        rc = select(mca_oob_tcp_component.tcp_listen_sd + 1, &readfds,                    NULL, NULL, &timeout);        if (rc < 0) {            if (EAGAIN != opal_socket_errno && EINTR != opal_socket_errno) {                perror("select");            }            continue;        }        while (count < mca_oob_tcp_component.tcp_copy_spin_count &&                opal_list_get_size(&mca_oob_tcp_component.tcp_copy_in_connections) <                (size_t) mca_oob_tcp_component.tcp_copy_max_size) {            OPAL_FREE_LIST_WAIT(&mca_oob_tcp_component.tcp_pending_connections_fl,                                 fl_item, rc);            item = (mca_oob_tcp_pending_connection_t*) fl_item;            item->fd = accept(mca_oob_tcp_component.tcp_listen_sd,                               (struct sockaddr*)&(item->addr), &addrlen);            if(item->fd < 0) {                OPAL_FREE_LIST_RETURN(&mca_oob_tcp_component.tcp_pending_connections_fl,                                       fl_item);                if (mca_oob_tcp_component.tcp_shutdown) return NULL;                if(opal_socket_errno != EAGAIN || opal_socket_errno != EWOULDBLOCK) {                    opal_output(0, "mca_oob_tcp_accept: accept() failed: %s (%d).",                                strerror(opal_socket_errno), opal_socket_errno);                    close(item->fd);                    return NULL;                }                count++;                continue;            }            if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {                opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_listen_thread: (%d, %d) %s:%d\n",                            ORTE_NAME_ARGS(orte_process_info.my_name),                            item->fd, opal_socket_errno,                            inet_ntoa(item->addr.sin_addr),                            item->addr.sin_port);            }            opal_list_append(&mca_oob_tcp_component.tcp_copy_in_connections,                             (opal_list_item_t*) item);        }        if (0 < opal_list_get_size(&mca_oob_tcp_component.tcp_copy_in_connections)) {            opal_mutex_lock(&mca_oob_tcp_component.tcp_pending_connections_lock);            opal_list_join(&mca_oob_tcp_component.tcp_pending_connections,                           opal_list_get_end(&mca_oob_tcp_component.tcp_pending_connections),                           &mca_oob_tcp_component.tcp_copy_in_connections);            while (NULL != (fl_item = (opal_free_list_item_t*) opal_list_remove_first(&mca_oob_tcp_component.tcp_connections_return_copy))) {                OPAL_FREE_LIST_RETURN(&mca_oob_tcp_component.tcp_pending_connections_fl, fl_item);            }            opal_mutex_unlock(&mca_oob_tcp_component.tcp_pending_connections_lock);        }    }    return NULL;}/* called from opal_progress() to create the oob contact information   for the file descriptors accepted() by the accept thread. */static int mca_oob_tcp_listen_progress(void){    int count = 0;    mca_oob_tcp_pending_connection_t *item;    mca_oob_tcp_event_t* event;#if OPAL_TIMER_USEC_NATIVE    opal_timer_t now = opal_timer_base_get_usec();#else    opal_timer_t now = opal_timer_base_get_cycles();#endif  /* OPAL_TIMER_USEC_NATIVE */    /* if we've not pulled pending connections for a while OR we've       hit the high water mark of pending connections, grab all the       pending connections */    if ((now - mca_oob_tcp_component.tcp_last_copy_time >          mca_oob_tcp_component.tcp_copy_delta) ||        ((size_t) mca_oob_tcp_component.tcp_copy_max_size <         opal_list_get_size(&mca_oob_tcp_component.tcp_pending_connections))) {        /* copy the pending connections from the list the accept           thread is inserting into into a temporary list for us to           process from.  Then copy the returned free list items into           that thread's return list, so it can free them soonish.           This is an O(1) operation, so we minimize the lock time. */        opal_mutex_lock(&mca_oob_tcp_component.tcp_pending_connections_lock);        opal_list_join(&mca_oob_tcp_component.tcp_copy_out_connections,                       opal_list_get_end(&mca_oob_tcp_component.tcp_copy_out_connections),                       &mca_oob_tcp_component.tcp_pending_connections);        opal_list_join(&mca_oob_tcp_component.tcp_connections_return_copy,                       opal_list_get_end(&mca_oob_tcp_component.tcp_connections_return_copy),                       &mca_oob_tcp_component.tcp_connections_return);        opal_mutex_unlock(&mca_oob_tcp_component.tcp_pending_connections_lock);        /* process al the connections */        while (NULL != (item = (mca_oob_tcp_pending_connection_t*)                         opal_list_remove_first(&mca_oob_tcp_component.                                               tcp_copy_out_connections))) {            /* setup socket options */            mca_oob_tcp_set_socket_options(item->fd);            /* log the accept */            if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {                opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_listen_progress: %s:%d\n",                            ORTE_NAME_ARGS(orte_process_info.my_name),                            inet_ntoa(item->addr.sin_addr),                            item->addr.sin_port);            }            /* wait for receipt of peers process identifier to               complete this connection */            event = OBJ_NEW(mca_oob_tcp_event_t);            opal_event_set(&event->event, item->fd, OPAL_EV_READ, mca_oob_tcp_recv_handler, event);            opal_event_add(&event->event, 0);            /* put on the needs returning list */            opal_list_append(&mca_oob_tcp_component.tcp_connections_return,                              (opal_list_item_t*) item);            count++;        }        mca_oob_tcp_component.tcp_last_copy_time = now;    }    return count;}static int mca_oob_tcp_create_listen_thread(void){    struct sockaddr_in inaddr;    opal_socklen_t addrlen;    int flags;    /* create a listen socket for incoming connections */    mca_oob_tcp_component.tcp_listen_sd = socket(AF_INET, SOCK_STREAM, 0);    if(mca_oob_tcp_component.tcp_listen_sd < 0) {        opal_output(0,"mca_oob_tcp_component_init: socket() failed: %s (%d)",                    strerror(opal_socket_errno), opal_socket_errno);        return ORTE_ERROR;    }    /* setup socket options */    mca_oob_tcp_set_socket_options(mca_oob_tcp_component.tcp_listen_sd);    /* bind address */    memset(&inaddr, 0, sizeof(inaddr));    inaddr.sin_family = AF_INET;    inaddr.sin_addr.s_addr = INADDR_ANY;    inaddr.sin_port = 0;    if(bind(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) {        opal_output(0,"mca_oob_tcp_create_listen: bind() failed: %s (%d)",                     strerror(opal_socket_errno), opal_socket_errno);        return ORTE_ERROR;    }    /* resolve system assigned port */    addrlen = sizeof(struct sockaddr_in);    if(getsockname(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {        opal_output(0, "mca_oob_tcp_create_listen: getsockname() failed: %s (%d)",                     strerror(opal_socket_errno), opal_socket_errno);        return ORTE_ERROR;    }    mca_oob_tcp_component.tcp_listen_port = inaddr.sin_port;    /* setup listen backlog to maximum allowed by kernel */    if(listen(mca_oob_tcp_component.tcp_listen_sd, SOMAXCONN) < 0) {        opal_output(0, "mca_oob_tcp_component_init: listen() failed: %s (%d)",                     strerror(opal_socket_errno), opal_socket_errno);        return ORTE_ERROR;    }    /* set socket up to be non-blocking, otherwise accept could block */    if((flags = fcntl(mca_oob_tcp_component.tcp_listen_sd, F_GETFL, 0)) < 0) {        opal_output(0, "mca_oob_tcp_component_init: fcntl(F_GETFL) failed: %s (%d)",                     strerror(opal_socket_errno), opal_socket_errno);        return ORTE_ERROR;    } else {        flags |= O_NONBLOCK;        if(fcntl(mca_oob_tcp_component.tcp_listen_sd, F_SETFL, flags) < 0) {            opal_output(0, "mca_oob_tcp_component_init: fcntl(F_SETFL) failed: %s (%d)",                         strerror(opal_socket_errno), opal_socket_errno);            return ORTE_ERROR;        }    }    /* start the listen thread */    mca_oob_tcp_component.tcp_listen_thread.t_run = mca_oob_tcp_listen_thread;    mca_oob_tcp_component.tcp_listen_thread.t_arg = NULL;    return opal_thread_start(&mca_oob_tcp_component.tcp_listen_thread);}/* * Handle probe */static void mca_oob_tcp_recv_probe(int sd, mca_oob_tcp_hdr_t* hdr){    unsigned char* ptr = (unsigned char*)hdr;    size_t cnt = 0;    hdr->msg_type = MCA_OOB_TCP_PROBE;    hdr->msg_dst = hdr->msg_src;    hdr->msg_src = *orte_process_info.my_name;    MCA_OOB_TCP_HDR_HTON(hdr);    while(cnt < sizeof(mca_oob_tcp_hdr_t)) {        int retval = send(sd, (char *)ptr+cnt, sizeof(mca_oob_tcp_hdr_t)-cnt, 0);        if(retval < 0) {            if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {                opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_probe: send() failed: %s (%d)\n",                    ORTE_NAME_ARGS(orte_process_info.my_name),                    ORTE_NAME_ARGS(&(hdr->msg_src)),                    strerror(opal_socket_errno),                    opal_socket_errno);                CLOSE_THE_SOCKET(sd);                return;            }            continue;        }        cnt += retval;    }    CLOSE_THE_SOCKET(sd);}/* * Handle connection request */static void mca_oob_tcp_recv_connect(int sd, mca_oob_tcp_hdr_t* hdr){    mca_oob_tcp_peer_t* peer;    int flags;    int cmpval;    /* now set socket up to be non-blocking */    if((flags = fcntl(sd, F_GETFL, 0)) < 0) {        opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: fcntl(F_GETFL) failed: %s (%d)",               ORTE_NAME_ARGS(orte_process_info.my_name), strerror(opal_socket_errno), opal_socket_errno);    } else {        flags |= O_NONBLOCK;        if(fcntl(sd, F_SETFL, flags) < 0) {            opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: fcntl(F_SETFL) failed: %s (%d)",                ORTE_NAME_ARGS(orte_process_info.my_name), strerror(opal_socket_errno), opal_socket_errno);        }    }    /* check for invalid name - if this is true - we allocate a name from the name server     * and return to the peer     */    cmpval = orte_ns.compare_fields(ORTE_NS_CMP_ALL, &hdr->msg_src, ORTE_NAME_INVALID);    if (cmpval == ORTE_EQUAL) {        if (ORTE_SUCCESS != orte_ns.create_jobid(&hdr->msg_src.jobid, NULL)) {           return;        }        if (ORTE_SUCCESS != orte_ns.reserve_range(hdr->msg_src.jobid, 1, &hdr->msg_src.vpid)) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -