📄 oob_tcp.c
字号:
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2006-2007 Los Alamos National Security, LLC. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ * * In windows, many of the socket functions return an EWOULDBLOCK * instead of \ things like EAGAIN, EINPROGRESS, etc. It has been * verified that this will \ not conflict with other error codes that * are returned by these functions \ under UNIX/Linux environments */#include "orte_config.h"#include "orte/orte_types.h"#ifdef HAVE_UNISTD_H#include <unistd.h>#endif#ifdef HAVE_SYS_TYPES_H#include <sys/types.h>#endif#include <fcntl.h>#ifdef HAVE_NETINET_IN_H#include <netinet/in.h>#endif#ifdef HAVE_ARPA_INET_H#include <arpa/inet.h>#endif#include "opal/opal_socket_errno.h"#include "opal/util/output.h"#include "opal/util/if.h"#include "orte/class/orte_proc_table.h"#include "orte/mca/oob/tcp/oob_tcp.h"#include "orte/mca/errmgr/errmgr.h"#include "orte/mca/ns/ns.h"#include "orte/mca/gpr/gpr.h"/* * Data structure for accepting connections. */struct mca_oob_tcp_event_t { opal_list_item_t item; opal_event_t event;};typedef struct mca_oob_tcp_event_t mca_oob_tcp_event_t;static void mca_oob_tcp_event_construct(mca_oob_tcp_event_t* event){ OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock); opal_list_append(&mca_oob_tcp_component.tcp_events, &event->item); OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);}static void mca_oob_tcp_event_destruct(mca_oob_tcp_event_t* event){ OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock); opal_list_remove_item(&mca_oob_tcp_component.tcp_events, &event->item); OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);}OBJ_CLASS_INSTANCE( mca_oob_tcp_event_t, opal_list_item_t, mca_oob_tcp_event_construct, mca_oob_tcp_event_destruct);/* * Local utility functions */static int mca_oob_tcp_create_listen(void);static int mca_oob_tcp_create_listen_thread(void);static void mca_oob_tcp_recv_handler(int sd, short flags, void* user);static void mca_oob_tcp_accept(void);struct mca_oob_tcp_subscription_t { opal_list_item_t item; orte_jobid_t jobid; orte_gpr_subscription_id_t subid;};typedef struct mca_oob_tcp_subscription_t mca_oob_tcp_subscription_t;OBJ_CLASS_INSTANCE( mca_oob_tcp_subscription_t, opal_list_item_t, NULL, NULL);OBJ_CLASS_INSTANCE( mca_oob_tcp_pending_connection_t, opal_free_list_item_t, NULL, NULL);OBJ_CLASS_INSTANCE(mca_oob_tcp_device_t, opal_list_item_t, NULL, NULL);/* * Struct of function pointers and all that to let us be initialized */mca_oob_tcp_component_t mca_oob_tcp_component = { { { MCA_OOB_BASE_VERSION_1_0_0, "tcp", /* MCA module name */ 1, /* MCA component major version */ 0, /* MCA component minor version */ 0, /* MCA component release version */ mca_oob_tcp_component_open, /* component open */ mca_oob_tcp_component_close /* component close */ }, { false /* checkpoint / restart */ }, mca_oob_tcp_component_init }};static mca_oob_t mca_oob_tcp = { mca_oob_tcp_get_addr, mca_oob_tcp_set_addr, mca_oob_tcp_ping, mca_oob_tcp_send, mca_oob_tcp_recv, mca_oob_tcp_send_nb, mca_oob_tcp_recv_nb, mca_oob_tcp_recv_cancel, mca_oob_tcp_init, mca_oob_tcp_fini, mca_oob_xcast};/* * Utility function to register/lookup module parameters. */static inline int mca_oob_tcp_param_register_int( const char* param_name, int default_value){ int id = mca_base_param_register_int("oob","tcp",param_name,NULL,default_value); int param_value = default_value; mca_base_param_lookup_int(id,¶m_value); return param_value;}static inline char* mca_oob_tcp_param_register_str( const char* param_name, const char* default_value){ int id = mca_base_param_register_string("oob","tcp",param_name,NULL,default_value); char* param_value = NULL; mca_base_param_lookup_string(id,¶m_value); return param_value;}/* * Initialize global variables used w/in this module. */int mca_oob_tcp_component_open(void){ char *listen_type, *str; int tmp;#ifdef __WINDOWS__ WSADATA win_sock_data; if (WSAStartup(MAKEWORD(2,2), &win_sock_data) != 0) { opal_output (0, "mca_oob_tcp_component_init: failed to initialise windows sockets: error %d\n", WSAGetLastError()); return ORTE_ERROR; }#endif OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_subscriptions, opal_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_list, opal_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peers, opal_hash_table_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_names, opal_hash_table_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_free, opal_free_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msgs, opal_free_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_lock, opal_mutex_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_events, opal_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_post, opal_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_recv, opal_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_completed, opal_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_match_lock, opal_mutex_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_match_cond, opal_condition_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_listen_thread, opal_thread_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_available_devices, opal_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_pending_connections_fl, opal_free_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_pending_connections, opal_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_copy_out_connections, opal_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_copy_in_connections, opal_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_connections_return, opal_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_connections_return_copy, opal_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_pending_connections_lock, opal_mutex_t); /* register oob module parameters */ mca_oob_tcp_component.tcp_peer_limit = mca_oob_tcp_param_register_int("peer_limit", -1); mca_oob_tcp_component.tcp_peer_retries = mca_oob_tcp_param_register_int("peer_retries", 60); mca_oob_tcp_component.tcp_debug = mca_oob_tcp_param_register_int("debug", 0); mca_oob_tcp_component.tcp_sndbuf = mca_oob_tcp_param_register_int("sndbuf", 128*1024); mca_oob_tcp_component.tcp_rcvbuf = mca_oob_tcp_param_register_int("rcvbuf", 128*1024); mca_base_param_reg_string(&mca_oob_tcp_component.super.oob_base, "if_include", "Comma-delimited list of TCP interfaces to use", false, false, NULL, &mca_oob_tcp_component.tcp_include); mca_base_param_reg_string(&mca_oob_tcp_component.super.oob_base, "include", "Obsolete synonym for oob_tcp_if_include", true, false, NULL, &str); if (NULL != str) { if (NULL == mca_oob_tcp_component.tcp_include) { mca_oob_tcp_component.tcp_include = str; } else { free(str); } } mca_base_param_reg_string(&mca_oob_tcp_component.super.oob_base, "if_exclude", "Comma-delimited list of TCP interfaces to exclude", false, false, NULL, &mca_oob_tcp_component.tcp_exclude); mca_base_param_reg_string(&mca_oob_tcp_component.super.oob_base, "exclude", "Obsolete synonym for oob_tcp_if_exclude", true, false, NULL, &str); if (NULL != str) { if (NULL == mca_oob_tcp_component.tcp_exclude) { mca_oob_tcp_component.tcp_exclude = str; } else { free(str); } } mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base, "connect_sleep", "Enable (1) / disable (0) random sleep for connection wireup", false, false, 1, &mca_oob_tcp_component.connect_sleep); mca_base_param_reg_string(&mca_oob_tcp_component.super.oob_base, "listen_mode", "Mode for HNP to accept incoming connections: event, listen_thread", false, false, "event", &listen_type); if (0 == strcmp(listen_type, "event")) { mca_oob_tcp_component.tcp_listen_type = OOB_TCP_EVENT; } else if (0 == strcmp(listen_type, "listen_thread")) { mca_oob_tcp_component.tcp_listen_type = OOB_TCP_LISTEN_THREAD; } else { opal_output(0, "Invalid value for oob_tcp_listen_mode parameter: %s", listen_type); return ORTE_ERROR; } mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base, "listen_thread_max_queue", "High water mark for queued accepted socket list size", false, false, 10, &mca_oob_tcp_component.tcp_copy_max_size); mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base, "listen_thread_max_time", "Maximum amount of time (in milliseconds) to wait between processing accepted socket list", false, false, 10, &tmp);#if OPAL_TIMER_USEC_NATIVE mca_oob_tcp_component.tcp_copy_delta = tmp * 1000;#else mca_oob_tcp_component.tcp_copy_delta = tmp * opal_timer_base_get_freq() / 1000;#endif mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base, "accept_spin_count", "Number of times to let accept return EWOULDBLOCK before updating accepted socket list", false, false, 10, &mca_oob_tcp_component.tcp_copy_spin_count); /* initialize state */ mca_oob_tcp_component.tcp_shutdown = false; mca_oob_tcp_component.tcp_listen_sd = -1; mca_oob_tcp_component.tcp_match_count = 0; mca_oob_tcp_component.tcp_last_copy_time = 0; return ORTE_SUCCESS;}/* * Cleanup of global variables used by this module. */int mca_oob_tcp_component_close(void){ opal_list_item_t *item;#ifdef __WINDOWS__ WSACleanup();#endif /* cleanup resources */ while (NULL != (item = opal_list_remove_first(&mca_oob_tcp_component.tcp_available_devices))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_available_devices); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_pending_connections_lock); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_connections_return_copy); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_connections_return); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_copy_out_connections); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_pending_connections); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_pending_connections_fl); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_listen_thread); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_match_cond); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_match_lock); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_completed); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_recv); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_post); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_events); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_lock); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msgs); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_free); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_names); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peers); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_subscriptions); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_list); return ORTE_SUCCESS;}/* * Called by mca_oob_tcp_recv_handler() when the TCP listen * socket has pending connection requests. Accept incoming * requests and queue for completion of the connection handshake.*/static void mca_oob_tcp_accept(void){ while(true) { opal_socklen_t addrlen = sizeof(struct sockaddr_in); struct sockaddr_in addr; mca_oob_tcp_event_t* event;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -