📄 sock.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpiimpl.h"#include "mpidu_sock.h"#define MPIDI_QUOTE(A) MPIDI_QUOTE2(A)#define MPIDI_QUOTE2(A) #A#define SOCKI_TCP_BUFFER_SIZE 32*1024#define SOCKI_DESCRIPTION_LENGTH 256#define SOCKI_NUM_PREPOSTED_ACCEPTS 32typedef enum SOCKI_TYPE { SOCKI_INVALID, SOCKI_LISTENER, SOCKI_SOCKET, SOCKI_WAKEUP } SOCKI_TYPE;typedef int SOCKI_STATE;#define SOCKI_ACCEPTING 0x0001#define SOCKI_CONNECTING 0x0004#define SOCKI_READING 0x0008#define SOCKI_WRITING 0x0010typedef struct sock_buffer{ DWORD num_bytes; OVERLAPPED ovl; MPID_IOV tiov;#ifdef USE_SOCK_IOV_COPY MPID_IOV iov[MPID_IOV_MAXLEN];#else MPID_IOV *iov;#endif int iovlen; int index; MPIU_Size_t total; MPIDU_Sock_progress_update_func_t progress_update;} sock_buffer;typedef struct sock_state_t{ SOCKI_TYPE type; SOCKI_STATE state; SOCKET sock; MPIDU_Sock_set_t set; int closing; int pending_operations; /* listener/accept structures */ SOCKET listen_sock; char accept_buffer[sizeof(struct sockaddr_in)*2+32]; /* read and write structures */ sock_buffer read; sock_buffer write; /* connect structures */ char *cur_host; char host_description[SOCKI_DESCRIPTION_LENGTH]; /* user pointer */ void *user_ptr; /* internal list */ struct sock_state_t *list, *next; int accepted; int listener_closed; /* timing variables for completion notification */ /* double rt1, rt2; double wt1, wt2; double ct1, ct2; */ struct sock_state_t *next_sock;} sock_state_t;static int g_num_cp_threads = 2;static int g_socket_buffer_size = SOCKI_TCP_BUFFER_SIZE;static int g_socket_rbuffer_size = SOCKI_TCP_BUFFER_SIZE;static int g_socket_sbuffer_size = SOCKI_TCP_BUFFER_SIZE;static int g_init_called = 0;static int g_num_posted_accepts = SOCKI_NUM_PREPOSTED_ACCEPTS;static int g_min_port = 0;static int g_max_port = 0;static sock_state_t *g_sock_list = NULL;/* empty structure used to wake up a sock_wait call */sock_state_t g_wakeup_state;static void translate_error(int error, char *msg, char *prepend){ HLOCAL str; int num_bytes; num_bytes = FormatMessage( FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER, 0, error, MAKELANGID( LANG_NEUTRAL, SUBLANG_DEFAULT ), (LPTSTR) &str, 0,0); if (num_bytes == 0) { if (prepend != NULL) MPIU_Strncpy(msg, prepend, 1024); else *msg = '\0'; } else { if (prepend == NULL) memcpy(msg, str, num_bytes+1); else MPIU_Snprintf(msg, 1024, "%s%s", prepend, (const char*)str); LocalFree(str); strtok(msg, "\r\n"); }}static char *get_error_string(int error_code){ /* obviously not thread safe to store a message like this */ static char error_msg[1024]; translate_error(error_code, error_msg, NULL); return error_msg;}#undef FUNCNAME#define FUNCNAME easy_create#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int easy_create_ranged(SOCKET *sock, int port, unsigned long addr){ int mpi_errno; /*struct linger linger;*/ int optval, len; SOCKET temp_sock; SOCKADDR_IN sockAddr; int use_range = 0; /* create the non-blocking socket */ temp_sock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if (temp_sock == INVALID_SOCKET) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**socket", "**socket %s %d", get_error_string(mpi_errno), mpi_errno); return mpi_errno; } if (port == 0 && g_min_port != 0 && g_max_port != 0) { use_range = 1; port = g_min_port; } memset(&sockAddr,0,sizeof(sockAddr)); sockAddr.sin_family = AF_INET; sockAddr.sin_addr.s_addr = addr; sockAddr.sin_port = htons((unsigned short)port); for (;;) { if (bind(temp_sock, (SOCKADDR*)&sockAddr, sizeof(sockAddr)) == SOCKET_ERROR) { if (use_range) { port++; if (port > g_max_port) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**socket", 0); return mpi_errno; } sockAddr.sin_port = htons((unsigned short)port); } else { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**socket", "**socket %s %d", get_error_string(mpi_errno), mpi_errno); return mpi_errno; } } else { break; } } /* Set the linger on close option */ /* linger.l_onoff = 1 ; linger.l_linger = 60; setsockopt(temp_sock, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger)); */ /* set the socket to non-blocking */ /* optval = TRUE; ioctlsocket(temp_sock, FIONBIO, &optval); */ /* set the socket buffers */ len = sizeof(int); if (!getsockopt(temp_sock, SOL_SOCKET, SO_RCVBUF, (char*)&optval, &len)) { optval = g_socket_rbuffer_size; setsockopt(temp_sock, SOL_SOCKET, SO_RCVBUF, (char*)&optval, sizeof(int)); } len = sizeof(int); if (!getsockopt(temp_sock, SOL_SOCKET, SO_SNDBUF, (char*)&optval, &len)) { optval = g_socket_sbuffer_size; setsockopt(temp_sock, SOL_SOCKET, SO_SNDBUF, (char*)&optval, sizeof(int)); } /* prevent the socket from being inherited by child processes */ if (!DuplicateHandle( GetCurrentProcess(), (HANDLE)temp_sock, GetCurrentProcess(), (HANDLE*)sock, 0, FALSE, DUPLICATE_CLOSE_SOURCE | DUPLICATE_SAME_ACCESS)) { mpi_errno = GetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**duphandle", "**duphandle %s %d", get_error_string(mpi_errno), mpi_errno); return mpi_errno; } /* Set the no-delay option */ setsockopt(*sock, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME easy_create#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int easy_create(SOCKET *sock, int port, unsigned long addr){ int mpi_errno; /*struct linger linger;*/ int optval, len; SOCKET temp_sock; SOCKADDR_IN sockAddr; /* create the non-blocking socket */ temp_sock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if (temp_sock == INVALID_SOCKET) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**socket", "**socket %s %d", get_error_string(mpi_errno), mpi_errno); return mpi_errno; } memset(&sockAddr,0,sizeof(sockAddr)); sockAddr.sin_family = AF_INET; sockAddr.sin_addr.s_addr = addr; sockAddr.sin_port = htons((unsigned short)port); if (bind(temp_sock, (SOCKADDR*)&sockAddr, sizeof(sockAddr)) == SOCKET_ERROR) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**socket", "**socket %s %d", get_error_string(mpi_errno), mpi_errno); return mpi_errno; } /* Set the linger on close option */ /* linger.l_onoff = 1 ; linger.l_linger = 60; setsockopt(temp_sock, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger)); */ /* set the socket to non-blocking */ /* optval = TRUE; ioctlsocket(temp_sock, FIONBIO, &optval); */ /* set the socket buffers */ len = sizeof(int); if (!getsockopt(temp_sock, SOL_SOCKET, SO_RCVBUF, (char*)&optval, &len)) { optval = g_socket_rbuffer_size; setsockopt(temp_sock, SOL_SOCKET, SO_RCVBUF, (char*)&optval, sizeof(int)); } len = sizeof(int); if (!getsockopt(temp_sock, SOL_SOCKET, SO_SNDBUF, (char*)&optval, &len)) { optval = g_socket_sbuffer_size; setsockopt(temp_sock, SOL_SOCKET, SO_SNDBUF, (char*)&optval, sizeof(int)); } /* prevent the socket from being inherited by child processes */ if (!DuplicateHandle( GetCurrentProcess(), (HANDLE)temp_sock, GetCurrentProcess(), (HANDLE*)sock, 0, FALSE, DUPLICATE_CLOSE_SOURCE | DUPLICATE_SAME_ACCESS)) { mpi_errno = GetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**duphandle", "**duphandle %s %d", get_error_string(mpi_errno), mpi_errno); return mpi_errno; } /* Set the no-delay option */ setsockopt(*sock, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)); return MPI_SUCCESS;}static inline int easy_get_sock_info(SOCKET sock, char *name, int *port){ struct sockaddr_in addr; int name_len = sizeof(addr); DWORD len = 100; getsockname(sock, (struct sockaddr*)&addr, &name_len); *port = ntohs(addr.sin_port); /*GetComputerName(name, &len);*/ GetComputerNameEx(ComputerNameDnsFullyQualified, name, &len); /*gethostname(name, 100);*/ return 0;}static inline void init_state_struct(sock_state_t *p){ p->listen_sock = INVALID_SOCKET; p->sock = INVALID_SOCKET; p->set = INVALID_HANDLE_VALUE; p->user_ptr = NULL; p->type = 0; p->state = 0; p->closing = FALSE; p->pending_operations = 0; p->read.total = 0; p->read.num_bytes = 0; p->read.tiov.MPID_IOV_BUF = NULL;#ifndef USE_SOCK_IOV_COPY p->read.iov = NULL;#endif p->read.iovlen = 0; p->read.ovl.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); p->read.ovl.Offset = 0; p->read.ovl.OffsetHigh = 0; p->read.progress_update = NULL; p->write.total = 0; p->write.num_bytes = 0; p->write.tiov.MPID_IOV_BUF = NULL;#ifndef USE_SOCK_IOV_COPY p->write.iov = NULL;#endif p->write.iovlen = 0; p->write.ovl.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); p->write.ovl.Offset = 0; p->write.ovl.OffsetHigh = 0; p->write.progress_update = NULL; p->list = NULL; p->next = NULL; p->accepted = 0; p->listener_closed = 0; /* p->bogus_t1 = 0; p->bogus_t2 = 0; */ p->next_sock = g_sock_list; g_sock_list = p;}#undef FUNCNAME#define FUNCNAME post_next_accept#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline int post_next_accept(sock_state_t * context){ int mpi_errno; context->state = SOCKI_ACCEPTING; context->accepted = 0; /*printf("posting an accept.\n");fflush(stdout);*/ context->sock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if (context->sock == INVALID_SOCKET) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**fail", "**fail %s %d", get_error_string(mpi_errno), mpi_errno); return mpi_errno; } if (!AcceptEx( context->listen_sock, context->sock, context->accept_buffer, 0, sizeof(struct sockaddr_in)+16, sizeof(struct sockaddr_in)+16, &context->read.num_bytes, &context->read.ovl)) { mpi_errno = WSAGetLastError(); if (mpi_errno == ERROR_IO_PENDING) return MPI_SUCCESS; /*MPIU_Error_printf("AcceptEx failed with error %d\n", error);fflush(stdout);*/ mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**fail", "**fail %s %d", get_error_string(mpi_errno), mpi_errno);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -