📄 sock_iocp.c
字号:
&listen_state->read.num_bytes, &listen_state->read.ovl)) { error = WSAGetLastError(); if (error == ERROR_IO_PENDING) return TRUE; printf("AcceptEx failed with error %d\n", error); return FALSE; } return TRUE;}/* sock functions */static BlockAllocator g_StateAllocator;int sock_init(){ char *szNum; WSADATA wsaData; int err; MPIDI_STATE_DECL(MPID_STATE_SOCK_INIT); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_INIT); /* Start the Winsock dll */ if ((err = WSAStartup(MAKEWORD(2, 0), &wsaData)) != 0) { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_INIT); return err; } /* get the connection retry value */ szNum = getenv("SOCK_CONNECT_TRIES"); if (szNum != NULL) { g_connection_attempts = atoi(szNum); if (g_connection_attempts < 1) g_connection_attempts = DEFAULT_NUM_RETRIES; } g_StateAllocator = BlockAllocInit(sizeof(sock_state_t), 1000, 500, malloc, free); MPIDI_FUNC_EXIT(MPID_STATE_SOCK_INIT); return SOCK_SUCCESS;}int sock_finalize(){ MPIDI_STATE_DECL(MPID_STATE_SOCK_FINALIZE); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_FINALIZE); WSACleanup(); BlockAllocFinalize(&g_StateAllocator); MPIDI_FUNC_EXIT(MPID_STATE_SOCK_FINALIZE); return SOCK_SUCCESS;}int sock_create_set(sock_set_t *set){ HANDLE port; MPIDI_STATE_DECL(MPID_STATE_SOCK_CREATE_SET); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_CREATE_SET); port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, g_num_cp_threads); if (port != NULL) { *set = port; MPIDI_FUNC_EXIT(MPID_STATE_SOCK_CREATE_SET); return SOCK_SUCCESS; } MPIDI_FUNC_EXIT(MPID_STATE_SOCK_CREATE_SET); return SOCK_FAIL;}int sock_destroy_set(sock_set_t set){ BOOL b; MPIDI_STATE_DECL(MPID_STATE_SOCK_DESTROY_SET); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_DESTROY_SET); b = CloseHandle(set); if (b == TRUE) { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_DESTROY_SET); return SOCK_SUCCESS; } MPIDI_FUNC_EXIT(MPID_STATE_SOCK_DESTROY_SET); return SOCK_FAIL;}static int listening = 0;int sock_listen(sock_set_t set, void * user_ptr, int *port, sock_t *listener){ char host[100]; DWORD num_read = 0; sock_state_t * listen_state; MPIDI_STATE_DECL(MPID_STATE_SOCK_LISTEN); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_LISTEN); if (listening) { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_LISTEN); return SOCK_FAIL; } listening = 1; listen_state = (sock_state_t*)BlockAlloc(g_StateAllocator); init_state_struct(listen_state); if (easy_create(&listen_state->listen_sock, *port, INADDR_ANY) == SOCKET_ERROR) { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_LISTEN); return SOCK_FAIL; } if (listen(listen_state->listen_sock, SOMAXCONN) == SOCKET_ERROR) { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_LISTEN); return SOCK_FAIL; } if (CreateIoCompletionPort((HANDLE)listen_state->listen_sock, set, (DWORD)listen_state, g_num_cp_threads) == NULL) { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_LISTEN); return SOCK_FAIL; } easy_get_sock_info(listen_state->listen_sock, host, port); listen_state->user_ptr = user_ptr; listen_state->type = SOCK_LISTENER; /* do this last to make sure the listener state structure is completely initialized before a completion thread has the chance to satisfy the AcceptEx call */ if (!post_next_accept(listen_state)) { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_LISTEN); return SOCK_FAIL; } *listener = listen_state; MPIDI_FUNC_EXIT(MPID_STATE_SOCK_LISTEN); return SOCK_SUCCESS;}int sock_post_connect(sock_set_t set, void * user_ptr, char *host, int port, sock_t *connected){ struct hostent *lphost; struct sockaddr_in sockAddr; sock_state_t *connect_state; MPIDI_STATE_DECL(MPID_STATE_SOCK_POST_CONNECT); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_POST_CONNECT); memset(&sockAddr,0,sizeof(sockAddr)); /* setup the structures */ connect_state = (sock_state_t*)BlockAlloc(g_StateAllocator); init_state_struct(connect_state); sockAddr.sin_family = AF_INET; sockAddr.sin_addr.s_addr = inet_addr(host); if (sockAddr.sin_addr.s_addr == INADDR_NONE || sockAddr.sin_addr.s_addr == 0) { lphost = gethostbyname(host); if (lphost != NULL) sockAddr.sin_addr.s_addr = ((struct in_addr *)lphost->h_addr)->s_addr; else { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_POST_CONNECT); return SOCKET_ERROR; } } sockAddr.sin_port = htons((u_short)port); /* create a socket */ if (easy_create(&connect_state->sock, ADDR_ANY, INADDR_ANY) == SOCKET_ERROR) { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_POST_CONNECT); return SOCK_FAIL; } /* connect */ if (connect(connect_state->sock, (SOCKADDR*)&sockAddr, sizeof(sockAddr)) == SOCKET_ERROR) { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_POST_CONNECT); return SOCK_FAIL; } connect_state->user_ptr = user_ptr; connect_state->type = SOCK_SOCKET; connect_state->state = SOCK_CONNECTING; connect_state->set = set; /* associate the socket with the completion port */ if (CreateIoCompletionPort((HANDLE)connect_state->sock, set, (DWORD)connect_state, g_num_cp_threads) == NULL) { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_POST_CONNECT); return SOCK_FAIL; } connect_state->pending_operations++; /* post a completion event so the sock_post_connect can be notified through sock_wait */ PostQueuedCompletionStatus(set, 0, (DWORD)connect_state, &connect_state->write.ovl); *connected = connect_state; MPIDI_FUNC_EXIT(MPID_STATE_SOCK_POST_CONNECT); return SOCK_SUCCESS;}int sock_accept(sock_set_t set, void * user_ptr, sock_t listener, sock_t *accepted){ BOOL b; struct linger linger; int optval, len; sock_state_t *accept_state; MPIDI_STATE_DECL(MPID_STATE_SOCK_ACCEPT); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_ACCEPT); if (!(listener->state & SOCK_ACCEPTED)) { *accepted = NULL; MPIDI_FUNC_EXIT(MPID_STATE_SOCK_ACCEPT); return SOCK_FAIL; } accept_state = BlockAlloc(g_StateAllocator); if (accept_state == NULL) { *accepted = NULL; MPIDI_FUNC_EXIT(MPID_STATE_SOCK_ACCEPT); return SOCK_FAIL; } init_state_struct(accept_state); accept_state->type = SOCK_SOCKET; accept_state->sock = listener->sock; if (!post_next_accept(listener)) { *accepted = NULL; MPIDI_FUNC_EXIT(MPID_STATE_SOCK_ACCEPT); return SOCK_FAIL; } /* set the linger option */ linger.l_onoff = 1; linger.l_linger = 60; setsockopt(accept_state->sock, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger)); /* set the socket buffers */ len = sizeof(int); if (!getsockopt(accept_state->sock, SOL_SOCKET, SO_RCVBUF, (char*)&optval, &len)) { optval = 64*1024; setsockopt(accept_state->sock, SOL_SOCKET, SO_RCVBUF, (char*)&optval, sizeof(int)); } len = sizeof(int); if (!getsockopt(accept_state->sock, SOL_SOCKET, SO_SNDBUF, (char*)&optval, &len)) { optval = 64*1024; setsockopt(accept_state->sock, SOL_SOCKET, SO_SNDBUF, (char*)&optval, sizeof(int)); } /* set the no-delay option */ b = TRUE; setsockopt(accept_state->sock, IPPROTO_TCP, TCP_NODELAY, (char*)&b, sizeof(BOOL)); /* prevent the socket from being inherited by child processes */ DuplicateHandle( GetCurrentProcess(), (HANDLE)accept_state->sock, GetCurrentProcess(), (HANDLE*)&accept_state->sock, 0, FALSE, DUPLICATE_CLOSE_SOURCE | DUPLICATE_SAME_ACCESS); /* associate the socket with the completion port */ if (CreateIoCompletionPort((HANDLE)accept_state->sock, set, (DWORD)accept_state, g_num_cp_threads) == NULL) { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_ACCEPT); return SOCK_FAIL; } accept_state->user_ptr = user_ptr; accept_state->set = set; *accepted = accept_state; MPIDI_FUNC_EXIT(MPID_STATE_SOCK_ACCEPT); return SOCK_SUCCESS;}int sock_post_close(sock_t sock){ MPIDI_STATE_DECL(MPID_STATE_SOCK_POST_CLOSE); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_POST_CLOSE); sock->closing = TRUE; if (sock->pending_operations == 0) { shutdown(sock->sock, SD_BOTH); closesocket(sock->sock); sock->sock = SOCK_INVALID_SOCKET; PostQueuedCompletionStatus(sock->set, 0, (DWORD)sock, NULL); } MPIDI_FUNC_EXIT(MPID_STATE_SOCK_POST_CLOSE); return SOCK_SUCCESS;}int sock_wait(sock_set_t set, int millisecond_timeout, sock_wait_t *out){ int error; DWORD num_bytes; sock_state_t *sock; OVERLAPPED *ovl; DWORD dwFlags = 0; MPIDI_STATE_DECL(MPID_STATE_SOCK_WAIT); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_WAIT); for (;;) { if (GetQueuedCompletionStatus(set, &num_bytes, (DWORD*)&sock, &ovl, millisecond_timeout)) { if (sock->type == SOCK_SOCKET) { if (sock->closing && sock->pending_operations == 0) { out->num_bytes = 0; out->error = 0; out->op_type = SOCK_OP_CLOSE; out->user_ptr = sock->user_ptr; /*BlockFree(g_sock_allocator, sock);*/ MPIDI_FUNC_EXIT(MPID_STATE_SOCK_WAIT); return SOCK_SUCCESS; } if (ovl == &sock->read.ovl) { sock->read.total += num_bytes; if (sock->read.use_iov) { while (num_bytes) { if (sock->read.iov[sock->read.index].SOCK_IOV_LEN <= num_bytes) { num_bytes -= sock->read.iov[sock->read.index].SOCK_IOV_LEN; sock->read.index++; sock->read.iovlen--; } else { sock->read.iov[sock->read.index].SOCK_IOV_LEN -= num_bytes; sock->read.iov[sock->read.index].SOCK_IOV_BUF = (char*)(sock->read.iov[sock->read.index].SOCK_IOV_BUF) + num_bytes; num_bytes = 0; } } if (sock->read.iovlen == 0) { out->num_bytes = sock->read.total; out->op_type = SOCK_OP_READ; out->user_ptr = sock->user_ptr; sock->pending_operations--; if (sock->closing && sock->pending_operations == 0) { MPIU_dbg_printf("sock_wait: closing socket(%d) after iov read completed.\n", sock_getid(sock)); shutdown(sock->sock, SD_BOTH); closesocket(sock->sock); sock->sock = SOCK_INVALID_SOCKET; } MPIDI_FUNC_EXIT(MPID_STATE_SOCK_WAIT); return SOCK_SUCCESS; } /* make the user upcall */ if (sock->read.progress_update != NULL) sock->read.progress_update(num_bytes, sock->user_ptr); /* post a read of the remaining data */ WSARecv(sock->sock, sock->read.iov, sock->read.iovlen, &sock->read.num_bytes, &dwFlags, &sock->read.ovl, NULL); } else { sock->read.buffer = (char*)(sock->read.buffer) + num_bytes; sock->read.bufflen -= num_bytes; if (sock->read.bufflen == 0) { out->num_bytes = sock->read.total; out->op_type = SOCK_OP_READ; out->user_ptr = sock->user_ptr;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -