⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 oob_tcp.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 4 页
字号:
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana *                         University Research and Technology *                         Corporation.  All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University *                         of Tennessee Research Foundation.  All rights *                         reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, *                         University of Stuttgart.  All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. *                         All rights reserved. * Copyright (c) 2006-2007 Los Alamos National Security, LLC.  *                         All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ * * In windows, many of the socket functions return an EWOULDBLOCK * instead of \ things like EAGAIN, EINPROGRESS, etc. It has been * verified that this will \ not conflict with other error codes that * are returned by these functions \ under UNIX/Linux environments  */#include "orte_config.h"#include "orte/orte_types.h"#ifdef HAVE_UNISTD_H#include <unistd.h>#endif#ifdef HAVE_SYS_TYPES_H#include <sys/types.h>#endif#include <fcntl.h>#ifdef HAVE_NETINET_IN_H#include <netinet/in.h>#endif#ifdef HAVE_ARPA_INET_H#include <arpa/inet.h>#endif#include "opal/opal_socket_errno.h"#include "opal/util/output.h"#include "opal/util/if.h"#include "orte/class/orte_proc_table.h"#include "orte/mca/oob/tcp/oob_tcp.h"#include "orte/mca/errmgr/errmgr.h"#include "orte/mca/ns/ns.h"#include "orte/mca/gpr/gpr.h"/* * Data structure for accepting connections. */struct mca_oob_tcp_event_t {    opal_list_item_t item;    opal_event_t event;};typedef struct mca_oob_tcp_event_t mca_oob_tcp_event_t;static void mca_oob_tcp_event_construct(mca_oob_tcp_event_t* event){    OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);    opal_list_append(&mca_oob_tcp_component.tcp_events, &event->item);    OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);}static void mca_oob_tcp_event_destruct(mca_oob_tcp_event_t* event){    OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);    opal_list_remove_item(&mca_oob_tcp_component.tcp_events, &event->item);    OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);}OBJ_CLASS_INSTANCE(    mca_oob_tcp_event_t,    opal_list_item_t,    mca_oob_tcp_event_construct,    mca_oob_tcp_event_destruct);/* * Local utility functions */static int  mca_oob_tcp_create_listen(void);static int  mca_oob_tcp_create_listen_thread(void);static void mca_oob_tcp_recv_handler(int sd, short flags, void* user);static void mca_oob_tcp_accept(void);struct mca_oob_tcp_subscription_t {    opal_list_item_t item;    orte_jobid_t jobid;    orte_gpr_subscription_id_t subid;};typedef struct mca_oob_tcp_subscription_t mca_oob_tcp_subscription_t;OBJ_CLASS_INSTANCE(    mca_oob_tcp_subscription_t,    opal_list_item_t,    NULL,    NULL);OBJ_CLASS_INSTANCE(                   mca_oob_tcp_pending_connection_t,                   opal_free_list_item_t,                   NULL,                   NULL);OBJ_CLASS_INSTANCE(mca_oob_tcp_device_t, opal_list_item_t, NULL, NULL);/* * Struct of function pointers and all that to let us be initialized */mca_oob_tcp_component_t mca_oob_tcp_component = {  {    {        MCA_OOB_BASE_VERSION_1_0_0,        "tcp", /* MCA module name */        1,  /* MCA component major version */        0,  /* MCA component minor version */        0,  /* MCA component release version */        mca_oob_tcp_component_open,  /* component open */        mca_oob_tcp_component_close /* component close */    },    {        false /* checkpoint / restart */    },    mca_oob_tcp_component_init  }};static mca_oob_t mca_oob_tcp = {    mca_oob_tcp_get_addr,    mca_oob_tcp_set_addr,    mca_oob_tcp_ping,    mca_oob_tcp_send,    mca_oob_tcp_recv,    mca_oob_tcp_send_nb,    mca_oob_tcp_recv_nb,    mca_oob_tcp_recv_cancel,    mca_oob_tcp_init,    mca_oob_tcp_fini,    mca_oob_xcast};/* * Utility function to register/lookup module parameters. */static inline int mca_oob_tcp_param_register_int(    const char* param_name,    int default_value){    int id = mca_base_param_register_int("oob","tcp",param_name,NULL,default_value);    int param_value = default_value;    mca_base_param_lookup_int(id,&param_value);    return param_value;}static inline char* mca_oob_tcp_param_register_str(    const char* param_name,    const char* default_value){    int id = mca_base_param_register_string("oob","tcp",param_name,NULL,default_value);    char* param_value = NULL;    mca_base_param_lookup_string(id,&param_value);    return param_value;}/* * Initialize global variables used w/in this module. */int mca_oob_tcp_component_open(void){    char *listen_type, *str;    int tmp;#ifdef __WINDOWS__    WSADATA win_sock_data;    if (WSAStartup(MAKEWORD(2,2), &win_sock_data) != 0) {        opal_output (0, "mca_oob_tcp_component_init: failed to initialise windows sockets: error %d\n", WSAGetLastError());        return ORTE_ERROR;    }#endif    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_subscriptions, opal_list_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_list,     opal_list_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peers,         opal_hash_table_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_names,    opal_hash_table_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_free,     opal_free_list_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msgs,          opal_free_list_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_lock,          opal_mutex_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_events,        opal_list_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_post,      opal_list_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_recv,      opal_list_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_completed, opal_list_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_match_lock,    opal_mutex_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_match_cond,    opal_condition_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_listen_thread, opal_thread_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_available_devices, opal_list_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_pending_connections_fl, opal_free_list_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_pending_connections, opal_list_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_copy_out_connections, opal_list_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_copy_in_connections, opal_list_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_connections_return, opal_list_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_connections_return_copy, opal_list_t);    OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_pending_connections_lock, opal_mutex_t);    /* register oob module parameters */    mca_oob_tcp_component.tcp_peer_limit =        mca_oob_tcp_param_register_int("peer_limit", -1);    mca_oob_tcp_component.tcp_peer_retries =        mca_oob_tcp_param_register_int("peer_retries", 60);    mca_oob_tcp_component.tcp_debug =        mca_oob_tcp_param_register_int("debug", 0);    mca_oob_tcp_component.tcp_sndbuf =        mca_oob_tcp_param_register_int("sndbuf", 128*1024);    mca_oob_tcp_component.tcp_rcvbuf =        mca_oob_tcp_param_register_int("rcvbuf", 128*1024);    mca_base_param_reg_string(&mca_oob_tcp_component.super.oob_base,                              "if_include",                              "Comma-delimited list of TCP interfaces to use",                              false, false, NULL,                               &mca_oob_tcp_component.tcp_include);    mca_base_param_reg_string(&mca_oob_tcp_component.super.oob_base,                              "include",                              "Obsolete synonym for oob_tcp_if_include",                              true, false, NULL, &str);    if (NULL != str) {        if (NULL == mca_oob_tcp_component.tcp_include) {            mca_oob_tcp_component.tcp_include = str;        } else {            free(str);        }    }    mca_base_param_reg_string(&mca_oob_tcp_component.super.oob_base,                              "if_exclude",                              "Comma-delimited list of TCP interfaces to exclude",                              false, false, NULL,                               &mca_oob_tcp_component.tcp_exclude);    mca_base_param_reg_string(&mca_oob_tcp_component.super.oob_base,                              "exclude",                              "Obsolete synonym for oob_tcp_if_exclude",                              true, false, NULL, &str);    if (NULL != str) {        if (NULL == mca_oob_tcp_component.tcp_exclude) {            mca_oob_tcp_component.tcp_exclude = str;        } else {            free(str);        }    }    mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base,                           "connect_sleep",                           "Enable (1) / disable (0) random sleep for connection wireup",                           false,                           false,                           1,                           &mca_oob_tcp_component.connect_sleep);    mca_base_param_reg_string(&mca_oob_tcp_component.super.oob_base,                              "listen_mode",                              "Mode for HNP to accept incoming connections: event, listen_thread",                              false,                              false,                              "event",                              &listen_type);        if (0 == strcmp(listen_type, "event")) {        mca_oob_tcp_component.tcp_listen_type = OOB_TCP_EVENT;    } else if (0 == strcmp(listen_type, "listen_thread")) {        mca_oob_tcp_component.tcp_listen_type = OOB_TCP_LISTEN_THREAD;    } else {        opal_output(0, "Invalid value for oob_tcp_listen_mode parameter: %s",                    listen_type);        return ORTE_ERROR;    }    mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base,                           "listen_thread_max_queue",                           "High water mark for queued accepted socket list size",                           false,                           false,                           10,                           &mca_oob_tcp_component.tcp_copy_max_size);    mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base,                           "listen_thread_max_time",                           "Maximum amount of time (in milliseconds) to wait between processing accepted socket list",                           false,                           false,                           10,                           &tmp);#if OPAL_TIMER_USEC_NATIVE    mca_oob_tcp_component.tcp_copy_delta = tmp * 1000;#else    mca_oob_tcp_component.tcp_copy_delta = tmp *        opal_timer_base_get_freq() / 1000;#endif    mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base,                           "accept_spin_count",                           "Number of times to let accept return EWOULDBLOCK before updating accepted socket list",                           false,                           false,                           10,                           &mca_oob_tcp_component.tcp_copy_spin_count);    /* initialize state */    mca_oob_tcp_component.tcp_shutdown = false;    mca_oob_tcp_component.tcp_listen_sd = -1;    mca_oob_tcp_component.tcp_match_count = 0;    mca_oob_tcp_component.tcp_last_copy_time = 0;    return ORTE_SUCCESS;}/* * Cleanup of global variables used by this module. */int mca_oob_tcp_component_close(void){    opal_list_item_t *item;#ifdef __WINDOWS__    WSACleanup();#endif    /* cleanup resources */    while (NULL != (item = opal_list_remove_first(&mca_oob_tcp_component.tcp_available_devices))) {        OBJ_RELEASE(item);    }    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_available_devices);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_pending_connections_lock);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_connections_return_copy);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_connections_return);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_copy_out_connections);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_pending_connections);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_pending_connections_fl);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_listen_thread);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_match_cond);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_match_lock);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_completed);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_recv);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_post);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_events);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_lock);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msgs);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_free);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_names);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peers);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_subscriptions);    OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_list);    return ORTE_SUCCESS;}/* *  Called by mca_oob_tcp_recv_handler() when the TCP listen *  socket has pending connection requests. Accept incoming *  requests and queue for completion of the connection handshake.*/static void mca_oob_tcp_accept(void){    while(true) {        opal_socklen_t addrlen = sizeof(struct sockaddr_in);        struct sockaddr_in addr;        mca_oob_tcp_event_t* event;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -