📄 sock_iocp.c
字号:
if (easy_create(&listen_state->listen_sock, *port, INADDR_ANY) == SOCKET_ERROR) return SOCK_FAIL; if (listen(listen_state->listen_sock, SOMAXCONN) == SOCKET_ERROR) return SOCK_FAIL; if (CreateIoCompletionPort((HANDLE)listen_state->listen_sock, set, (DWORD)listen_state, g_num_cp_threads) == NULL) return SOCK_FAIL; easy_get_sock_info(listen_state->listen_sock, host, port); listen_state->user_ptr = user_ptr; listen_state->type = SOCK_LISTENER; /* do this last to make sure the listener state structure is completely initialized before a completion thread has the chance to satisfy the AcceptEx call */ if (!post_next_accept(listen_state)) return SOCK_FAIL; *listener = listen_state; return SOCK_SUCCESS;}int sock_post_connect(sock_set_t set, void * user_ptr, char *host, int port, sock_t *connected){ struct hostent *lphost; struct sockaddr_in sockAddr; sock_state_t *connect_state; memset(&sockAddr,0,sizeof(sockAddr)); /* setup the structures */ connect_state = (sock_state_t*)BlockAlloc(g_StateAllocator); init_state_struct(connect_state); sockAddr.sin_family = AF_INET; sockAddr.sin_addr.s_addr = inet_addr(host); if (sockAddr.sin_addr.s_addr == INADDR_NONE || sockAddr.sin_addr.s_addr == 0) { lphost = gethostbyname(host); if (lphost != NULL) sockAddr.sin_addr.s_addr = ((struct in_addr *)lphost->h_addr)->s_addr; else return SOCKET_ERROR; } sockAddr.sin_port = htons((u_short)port); /* create a socket */ if (easy_create(&connect_state->sock, ADDR_ANY, INADDR_ANY) == SOCKET_ERROR) return SOCK_FAIL; /* connect */ if (connect(connect_state->sock, (SOCKADDR*)&sockAddr, sizeof(sockAddr)) == SOCKET_ERROR) return SOCK_FAIL; connect_state->user_ptr = user_ptr; connect_state->type = SOCK_SOCKET; connect_state->state = SOCK_CONNECTING; /* associate the socket with the completion port */ if (CreateIoCompletionPort((HANDLE)connect_state->sock, set, (DWORD)connect_state, g_num_cp_threads) == NULL) return SOCK_FAIL; /* post a completion event so the sock_post_connect can be notified through sock_wait */ PostQueuedCompletionStatus(set, 0, (DWORD)connect_state, &connect_state->write.ovl); *connected = connect_state; return SOCK_SUCCESS;}int sock_accept(sock_set_t set, void * user_ptr, sock_t listener, sock_t *accepted){ BOOL b; struct linger linger; int optval, len; sock_state_t *accept_state; if (!(listener->state & SOCK_ACCEPTED)) { *accepted = NULL; return SOCK_FAIL; } accept_state = BlockAlloc(g_StateAllocator); if (accept_state == NULL) { *accepted = NULL; return SOCK_FAIL; } init_state_struct(accept_state); accept_state->type = SOCK_SOCKET; accept_state->sock = listener->sock; if (!post_next_accept(listener)) { *accepted = NULL; return SOCK_FAIL; } /* set the linger option */ linger.l_onoff = 1; linger.l_linger = 60; setsockopt(accept_state->sock, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger)); /* set the socket buffers */ len = sizeof(int); if (!getsockopt(accept_state->sock, SOL_SOCKET, SO_RCVBUF, (char*)&optval, &len)) { optval = 64*1024; setsockopt(accept_state->sock, SOL_SOCKET, SO_RCVBUF, (char*)&optval, sizeof(int)); } len = sizeof(int); if (!getsockopt(accept_state->sock, SOL_SOCKET, SO_SNDBUF, (char*)&optval, &len)) { optval = 64*1024; setsockopt(accept_state->sock, SOL_SOCKET, SO_SNDBUF, (char*)&optval, sizeof(int)); } /* set the no-delay option */ b = TRUE; setsockopt(accept_state->sock, IPPROTO_TCP, TCP_NODELAY, (char*)&b, sizeof(BOOL)); /* prevent the socket from being inherited by child processes */ DuplicateHandle( GetCurrentProcess(), (HANDLE)accept_state->sock, GetCurrentProcess(), (HANDLE*)&accept_state->sock, 0, FALSE, DUPLICATE_CLOSE_SOURCE | DUPLICATE_SAME_ACCESS); /* associate the socket with the completion port */ if (CreateIoCompletionPort((HANDLE)accept_state->sock, set, (DWORD)accept_state, g_num_cp_threads) == NULL) return SOCK_FAIL; *accepted = accept_state; return SOCK_SUCCESS;}int sock_close(sock_t sock){ shutdown(sock->sock, SD_BOTH); closesocket(sock->sock); return SOCK_SUCCESS;}int sock_wait(sock_set_t set, int millisecond_timeout, sock_wait_t *out){ int error; DWORD num_bytes; sock_state_t *sock; OVERLAPPED *ovl; DWORD dwFlags = 0; for (;;) { if (GetQueuedCompletionStatus(set, &num_bytes, (DWORD*)&sock, &ovl, millisecond_timeout)) { if (sock->type == SOCK_SOCKET) { if (ovl == &sock->read.ovl) { sock->read.total += num_bytes; if (sock->read.use_iov) { while (num_bytes) { if (sock->read.iov[sock->read.index].SOCK_IOV_LEN <= num_bytes) { num_bytes -= sock->read.iov[sock->read.index].SOCK_IOV_LEN; sock->read.index++; sock->read.iovlen--; } else { sock->read.iov[sock->read.index].SOCK_IOV_LEN -= num_bytes; sock->read.iov[sock->read.index].SOCK_IOV_BUF = (char*)(sock->read.iov[sock->read.index].SOCK_IOV_BUF) + num_bytes; num_bytes = 0; } } if (sock->read.iovlen == 0) { out->num_bytes = sock->read.total; out->op_type = SOCK_OP_READ; out->user_ptr = sock->user_ptr; return SOCK_SUCCESS; } /* make the user upcall */ if (sock->read.progress_update != NULL) sock->read.progress_update(num_bytes, sock->user_ptr); /* post a read of the remaining data */ WSARecv(sock->sock, sock->read.iov, sock->read.iovlen, &sock->read.num_bytes, &dwFlags, &sock->read.ovl, NULL); } else { sock->read.buffer = (char*)(sock->read.buffer) + num_bytes; sock->read.bufflen -= num_bytes; if (sock->read.bufflen == 0) { out->num_bytes = sock->read.total; out->op_type = SOCK_OP_READ; out->user_ptr = sock->user_ptr; return SOCK_SUCCESS; } /* make the user upcall */ if (sock->read.progress_update != NULL) sock->read.progress_update(num_bytes, sock->user_ptr); /* post a read of the remaining data */ ReadFile((HANDLE)(sock->sock), sock->read.buffer, sock->read.bufflen, &sock->read.num_bytes, &sock->read.ovl); } } else if (ovl == &sock->write.ovl) { if (sock->state & SOCK_CONNECTING) { /* insert code here to determine that the connect succeeded */ /* ... */ sock->state ^= SOCK_CONNECTING; /* remove the SOCK_CONNECTING bit */ out->op_type = SOCK_OP_CONNECT; out->user_ptr = sock->user_ptr; return SOCK_SUCCESS; } else { sock->write.total += num_bytes; if (sock->write.use_iov) { while (num_bytes) { if (sock->write.iov[sock->write.index].SOCK_IOV_LEN <= num_bytes) { num_bytes -= sock->write.iov[sock->write.index].SOCK_IOV_LEN; sock->write.index++; sock->write.iovlen--; } else { sock->write.iov[sock->write.index].SOCK_IOV_LEN -= num_bytes; sock->write.iov[sock->write.index].SOCK_IOV_BUF = (char*)(sock->write.iov[sock->write.index].SOCK_IOV_BUF) + num_bytes; num_bytes = 0; } } if (sock->write.iovlen == 0) { out->num_bytes = sock->write.total; out->op_type = SOCK_OP_WRITE; out->user_ptr = sock->user_ptr; return SOCK_SUCCESS; } /* make the user upcall */ if (sock->write.progress_update != NULL) sock->write.progress_update(num_bytes, sock->user_ptr); /* post a write of the remaining data */ WSASend(sock->sock, sock->write.iov, sock->write.iovlen, &sock->write.num_bytes, 0, &sock->write.ovl, NULL); } else { sock->write.buffer = (char*)(sock->write.buffer) + num_bytes; sock->write.bufflen -= num_bytes; if (sock->write.bufflen == 0) { out->num_bytes = sock->write.total; out->op_type = SOCK_OP_WRITE; out->user_ptr = sock->user_ptr; return SOCK_SUCCESS; } /* make the user upcall */ if (sock->write.progress_update != NULL) sock->write.progress_update(num_bytes, sock->user_ptr); /* post a write of the remaining data */ WriteFile((HANDLE)(sock->sock), sock->write.buffer, sock->write.bufflen, &sock->write.num_bytes, &sock->write.ovl); } } } else { return SOCK_FAIL; } } else if (sock->type == SOCK_LISTENER) { sock->state |= SOCK_ACCEPTED; out->num_bytes = num_bytes; out->op_type = SOCK_OP_ACCEPT; out->user_ptr = sock->user_ptr; return SOCK_SUCCESS; } else { return SOCK_FAIL; } } else { error = GetLastError(); /* interpret error, return appropriate SOCK_ERR_... macro */ if (error == WAIT_TIMEOUT) { out->op_type = SOCK_OP_TIMEOUT; out->error = SOCK_ERR_TIMEOUT; return SOCK_SUCCESS; } return SOCK_FAIL; } }}int sock_set_user_ptr(sock_t sock, void *user_ptr){ if (sock == SOCK_INVALID_SOCKET) return SOCK_FAIL; sock->user_ptr = user_ptr; return SOCK_SUCCESS;}/* immediate functions */int sock_read(sock_t sock, void *buf, int len, int *num_read){ *num_read = recv(sock->sock, buf, len, 0); return SOCK_SUCCESS;}int sock_readv(sock_t sock, SOCK_IOV *iov, int n, int *num_read){ DWORD nFlags = 0; if (WSARecv(sock->sock, iov, n, num_read, &nFlags, NULL/*overlapped*/, NULL/*completion routine*/) == SOCKET_ERROR) { if (WSAGetLastError() != WSAEWOULDBLOCK) { return SOCKET_ERROR; } *num_read = 0; } return SOCK_SUCCESS;}int sock_write(sock_t sock, void *buf, int len, int *num_written){ *num_written = send(sock->sock, buf, len, 0); return SOCK_SUCCESS;}int sock_writev(sock_t sock, SOCK_IOV *iov, int n, int *num_written){ if (n == 0) return 0; if (WSASend(sock->sock, iov, n, num_written, 0, NULL/*overlapped*/, NULL/*completion routine*/) == SOCKET_ERROR) { if (WSAGetLastError() != WSAEWOULDBLOCK) return SOCKET_ERROR; } return SOCK_SUCCESS;}/* non-blocking functions */int sock_post_read(sock_t sock, void *buf, int len, int (*rfn)(int, void*)){ sock->read.total = 0; sock->read.buffer = buf; sock->read.bufflen = len; sock->read.use_iov = FALSE; sock->read.progress_update = rfn; sock->state |= SOCK_READING; ReadFile((HANDLE)(sock->sock), buf, len, &sock->read.num_bytes, &sock->read.ovl); return SOCK_SUCCESS;}int sock_post_readv(sock_t sock, SOCK_IOV *iov, int n, int (*rfn)(int, void*)){ DWORD flags = 0; sock->read.total = 0; sock->read.iov = iov; sock->read.iovlen = n; sock->read.index = 0; sock->read.use_iov = TRUE; sock->read.progress_update = rfn; sock->state |= SOCK_READING; WSARecv(sock->sock, iov, n, &sock->read.num_bytes, &flags, &sock->read.ovl, NULL); return SOCK_SUCCESS;}int sock_post_write(sock_t sock, void *buf, int len, int (*wfn)(int, void*)){ sock->write.total = 0; sock->write.buffer = buf; sock->write.bufflen = len; sock->write.use_iov = FALSE; sock->write.progress_update = wfn; sock->state |= SOCK_WRITING; WriteFile((HANDLE)(sock->sock), buf, len, &sock->write.num_bytes, &sock->write.ovl); return SOCK_SUCCESS;}int sock_post_writev(sock_t sock, SOCK_IOV *iov, int n, int (*wfn)(int, void*)){ sock->write.total = 0; sock->write.iov = iov; sock->write.iovlen = n; sock->write.index = 0; sock->write.use_iov = TRUE; sock->write.progress_update = wfn; sock->state |= SOCK_WRITING; WSASend(sock->sock, iov, n, &sock->write.num_bytes, 0, &sock->write.ovl, NULL); return SOCK_SUCCESS;}int sock_easy_receive(sock_t sock, void *buf, int len, int *num_read){ int error; int n; int total = 0; while (len) { error = sock_read(sock, buf, len, &n); if (error != SOCK_SUCCESS) { *num_read = total; return error; } total += n; buf = (char*)buf + n; len -= n; } return SOCK_SUCCESS;}int sock_easy_send(sock_t sock, void *buf, int len, int *num_written){ int error; int n; int total = 0; while (len) { error = sock_write(sock, buf, len, &n); if (error != SOCK_SUCCESS) { *num_written = total; return error; } total += n; buf = (char*)buf + n; len -= n; } return SOCK_SUCCESS;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -