📄 sock.c
字号:
MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_CREATE_SET); return mpi_errno; } port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, g_num_cp_threads); if (port != NULL) { *set = port; MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_CREATE_SET); return MPI_SUCCESS; } mpi_errno = GetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**iocp", "**iocp %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_CREATE_SET); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDU_Sock_destroy_set#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sock_destroy_set(MPIDU_Sock_set_t set){ int mpi_errno; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_DESTROY_SET); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_DESTROY_SET); if (!g_init_called) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_init", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_DESTROY_SET); return mpi_errno; } if (!CloseHandle(set)) { mpi_errno = GetLastError(); if (mpi_errno == ERROR_INVALID_HANDLE) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_BAD_SET, "**bad_set", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_DESTROY_SET); return mpi_errno; } mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**fail", "**fail %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_DESTROY_SET); return mpi_errno; } MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_DESTROY_SET); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDU_Sock_native_to_sock#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sock_native_to_sock(MPIDU_Sock_set_t set, MPIDU_SOCK_NATIVE_FD fd, void *user_ptr, MPIDU_Sock_t *sock_ptr){ int mpi_errno; /*int ret_val;*/ sock_state_t *sock_state; /*u_long optval;*/ MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_NATIVE_TO_SOCK); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_NATIVE_TO_SOCK); if (!g_init_called) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_init", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_NATIVE_TO_SOCK); return mpi_errno; } /* setup the structures */ sock_state = (sock_state_t*)MPIU_Malloc(sizeof(sock_state_t)); if (sock_state == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_NATIVE_TO_SOCK); return mpi_errno; } init_state_struct(sock_state); sock_state->sock = (SOCKET)fd; /* set the socket to non-blocking */ /* leave the native handle in the state passed in? optval = TRUE; ioctlsocket(sock_state->sock, FIONBIO, &optval); */ sock_state->user_ptr = user_ptr; sock_state->type = SOCKI_SOCKET; sock_state->state = 0; sock_state->set = set; /* associate the socket with the completion port */ /*printf("CreateIOCompletionPort(%d, %p, %p, %d)\n", sock_state->sock, set, sock_state, g_num_cp_threads);fflush(stdout);*/ if (CreateIoCompletionPort((HANDLE)sock_state->sock, set, (ULONG_PTR)sock_state, g_num_cp_threads) == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_NATIVE_TO_SOCK); return mpi_errno; } *sock_ptr = sock_state; /*printf("native socket %d\n", sock_state->sock);fflush(stdout);*/ MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_NATIVE_TO_SOCK); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDU_Sock_listen#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sock_listen(MPIDU_Sock_set_t set, void * user_ptr, int * port, MPIDU_Sock_t * sock){ int mpi_errno; char host[100]; sock_state_t * listen_state, **listener_copies; int i; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_LISTEN); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_LISTEN); if (!g_init_called) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_init", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_LISTEN); return mpi_errno; } listen_state = (sock_state_t*)MPIU_Malloc(sizeof(sock_state_t)); init_state_struct(listen_state); mpi_errno = easy_create_ranged(&listen_state->listen_sock, *port, INADDR_ANY); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock_create", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_LISTEN); return mpi_errno; } if (listen(listen_state->listen_sock, SOMAXCONN) == SOCKET_ERROR) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**listen", "**listen %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_LISTEN); return mpi_errno; } if (CreateIoCompletionPort((HANDLE)listen_state->listen_sock, set, (ULONG_PTR)listen_state, g_num_cp_threads) == NULL) { mpi_errno = GetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**iocp", "**iocp %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_LISTEN); return mpi_errno; } easy_get_sock_info(listen_state->listen_sock, host, port); listen_state->user_ptr = user_ptr; listen_state->type = SOCKI_LISTENER; listen_state->set = set; /* post the accept(s) last to make sure the listener state structure is completely initialized before a completion thread has the chance to satisfy the AcceptEx call */ listener_copies = (sock_state_t**)MPIU_Malloc(g_num_posted_accepts * sizeof(sock_state_t*)); for (i=0; i<g_num_posted_accepts; i++) { listener_copies[i] = (sock_state_t*)MPIU_Malloc(sizeof(sock_state_t)); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(listener_copies[i], listen_state, sizeof(*listen_state)); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); if (i > 0) { listener_copies[i]->next = listener_copies[i-1]; } mpi_errno = post_next_accept(listener_copies[i]); if (mpi_errno != MPI_SUCCESS) { MPIU_Free(listener_copies); mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**post_accept", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_LISTEN); return mpi_errno; } } listen_state->list = listener_copies[g_num_posted_accepts-1]; MPIU_Free(listener_copies); *sock = listen_state; /*printf("listening socket %d\n", listen_state->listen_sock);fflush(stdout);*/ MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_LISTEN); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDU_Sock_accept#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sock_accept(MPIDU_Sock_t listener_sock, MPIDU_Sock_set_t set, void * user_ptr, MPIDU_Sock_t * sock){ int mpi_errno; BOOL b; /*struct linger linger;*/ u_long optval; int len; sock_state_t *accept_state, *iter; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_ACCEPT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_ACCEPT); if (!g_init_called) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_init", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_ACCEPT); return mpi_errno; } accept_state = MPIU_Malloc(sizeof(sock_state_t)); if (accept_state == NULL) { *sock = NULL; mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_ACCEPT); return mpi_errno; } init_state_struct(accept_state); accept_state->type = SOCKI_SOCKET; /* find the listener copy that satisfied the acceptex call and post another accept */ iter = listener_sock->list; while (iter != NULL && iter->accepted == 0) iter = iter->next; if (iter == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock_nop_accept", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_ACCEPT); return mpi_errno; } accept_state->sock = iter->sock; mpi_errno = post_next_accept(iter); if (mpi_errno != MPI_SUCCESS) { *sock = NULL; mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**post_accept", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_ACCEPT); return mpi_errno; } /* finish the accept */ setsockopt(accept_state->sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&listener_sock->listen_sock, sizeof(listener_sock->listen_sock)); /* set the socket to non-blocking */ optval = TRUE; ioctlsocket(accept_state->sock, FIONBIO, &optval); /* set the linger option */ /* linger.l_onoff = 1; linger.l_linger = 60; setsockopt(accept_state->sock, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger)); */ /* set the socket buffers */ len = sizeof(int); if (!getsockopt(accept_state->sock, SOL_SOCKET, SO_RCVBUF, (char*)&optval, &len)) { optval = g_socket_rbuffer_size; setsockopt(accept_state->sock, SOL_SOCKET, SO_RCVBUF, (char*)&optval, sizeof(int)); } len = sizeof(int); if (!getsockopt(accept_state->sock, SOL_SOCKET, SO_SNDBUF, (char*)&optval, &len)) { optval = g_socket_sbuffer_size; setsockopt(accept_state->sock, SOL_SOCKET, SO_SNDBUF, (char*)&optval, sizeof(int)); } /* set the no-delay option */ b = TRUE; setsockopt(accept_state->sock, IPPROTO_TCP, TCP_NODELAY, (char*)&b, sizeof(BOOL)); /* prevent the socket from being inherited by child processes */ DuplicateHandle( GetCurrentProcess(), (HANDLE)accept_state->sock, GetCurrentProcess(), (HANDLE*)&accept_state->sock, 0, FALSE, DUPLICATE_CLOSE_SOURCE | DUPLICATE_SAME_ACCESS); /* associate the socket with the completion port */ if (CreateIoCompletionPort((HANDLE)accept_state->sock, set, (ULONG_PTR)accept_state, g_num_cp_threads) == NULL) { mpi_errno = GetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**iocp", "**iocp %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_ACCEPT); return mpi_errno; } accept_state->user_ptr = user_ptr; accept_state->set = set; *sock = accept_state; /*printf("accepted socket %d\n", accept_state->sock);fflush(stdout);*/ MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_ACCEPT); return MPI_SUCCESS;}static unsigned int GetIP(char *pszIP){ unsigned int nIP; unsigned int a,b,c,d; if (pszIP == NULL) return 0; sscanf(pszIP, "%u.%u.%u.%u", &a, &b, &c, &d); /*printf("mask: %u.%u.%u.%u\n", a, b, c, d);fflush(stdout);*/ nIP = (d << 24) | (c << 16) | (b << 8) | a; return nIP;}static unsigned int GetMask(char *pszMask){ int i, nBits; unsigned int nMask = 0; unsigned int a,b,c,d; if (pszMask == NULL) return 0; if (strstr(pszMask, ".")) { sscanf(pszMask, "%u.%u.%u.%u", &a, &b, &c, &d); /*printf("mask: %u.%u.%u.%u\n", a, b, c, d);fflush(stdout);*/ nMask = (d << 24) | (c << 16) | (b << 8) | a; } else { nBits = atoi(pszMask); for (i=0; i<nBits; i++) { nMask = nMask << 1; nMask = nMask | 0x1; } } /* unsigned int a, b, c, d; a = ((unsigned char *)(&nMask))[0]; b = ((unsigned char *)(&nMask))[1]; c = ((unsigned char *)(&nMask))[2]; d = ((unsigned char *)(&nMask))[3]; printf("mask: %u.%u.%u.%u\n", a, b, c, d);fflush(stdout); */ return nMask;}#undef FUNCNAME#define FUNCNAME MPIDU_Sock_post_connect#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sock_post_connect(MPIDU_Sock_set_t set, void * user_ptr, char * host_description, int port, MPIDU_Sock_t * sock){ int mpi_errno; struct hostent *lphost; struct sockaddr_in sockAddr; sock_state_t *connect_state; u_long optval; char host[100]; int i; int connected = 0; int connect_errno = MPI_SUCCESS; char pszNetMask[50]; char *pEnv, *token; unsigned int nNicNet=0, nNicMask=0; int use_subnet = 0; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_POST_CONNECT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_POST_CONNECT); if (!g_init_called) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_init", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CONNECT); return mpi_errno; } if (strlen(host_description) > SOCKI_DESCRIPTION_LENGTH) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CONNECT); return mpi_errno; } memset(&sockAddr,0,sizeof(sockAddr)); /* setup the structures */ connect_state = (sock_state_t*)MPIU_Malloc(sizeof(sock_state_t)); init_state_struct(connect_state); connect_state->cur_host = connect_state->host_description; MPIU_Strncpy(connect_state->host_description, host_description, SOCKI_DESCRIPTION_LENGTH); /* create a socket */ mpi_errno = easy_create(&connect_state->sock, ADDR_ANY, INADDR_ANY); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_create", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CONNECT); return mpi_errno; } /* check to see if a subnet was specified through the environment */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -