📄 socket.c
字号:
* Worker threads for servicing the I/O */ iocompletionport_createthreads(manager->maxIOCPThreads, manager);} voidiocompletionport_exit(isc_socketmgr_t *manager) { REQUIRE(VALID_MANAGER(manager)); if (manager->hIoCompletionPort != NULL) { /* Get each of the service threads to exit */ signal_iocompletionport_exit(manager); }}/* * Add sockets in here and pass the sock data in as part of the * information needed. */voidiocompletionport_update(isc_socket_t *sock) { HANDLE hiocp; REQUIRE(sock != NULL); if(sock->iocp == 0) { sock->iocp = 1; hiocp = CreateIoCompletionPort((HANDLE) sock->fd, sock->manager->hIoCompletionPort, (DWORD) sock, sock->manager->maxIOCPThreads); InterlockedIncrement(&iocp_total); }}voidsocket_event_minit(sock_event_list *evlist) { BOOL bReset; int i; REQUIRE(evlist != NULL); /* Initialize the Event List */ evlist->max_event = 0; evlist->total_events = 0; for (i = 0; i < MAX_EVENTS; i++) { evlist->aSockList[i] = NULL; evlist->aEventList[i] = (WSAEVENT) 0; } evlist->aEventList[0] = WSACreateEvent(); (evlist->max_event)++; bReset = WSAResetEvent(evlist->aEventList[0]);}/* * Event Thread Initialization */isc_result_tevent_thread_create(events_thread_t **evthreadp, isc_socketmgr_t *manager) { events_thread_t *evthread; REQUIRE(VALID_MANAGER(manager)); REQUIRE(evthreadp != NULL && *evthreadp == NULL); evthread = isc_mem_get(manager->mctx, sizeof(*evthread)); socket_event_minit(&evthread->sockev_list); ISC_LINK_INIT(evthread, link); evthread->manager = manager; ISC_LIST_APPEND(manager->ev_threads, evthread, link); /* * Start up the event wait thread. */ if (isc_thread_create(event_wait, evthread, &evthread->thread_handle) != ISC_R_SUCCESS) { isc_mem_put(manager->mctx, evthread, sizeof(*evthread)); UNEXPECTED_ERROR(__FILE__, __LINE__, "isc_thread_create() %s", isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed")); return (ISC_R_UNEXPECTED); } *evthreadp = evthread; return (ISC_R_SUCCESS);}/* * Locate a thread with space for additional events or create one if * necessary. The manager is locked at this point so the information * cannot be changed by another thread while we are searching. */voidlocate_available_thread(isc_socketmgr_t *manager) { events_thread_t *evthread; DWORD threadid = GetCurrentThreadId(); evthread = ISC_LIST_HEAD(manager->ev_threads); while (evthread != NULL) { /* * We need to find a thread with space to add an event * If we find it, alert it to process the event change * list */ if(threadid != evthread->thread_id && evthread->sockev_list.max_event < MAX_EVENTS) { WSASetEvent(evthread->sockev_list.aEventList[0]); return; } evthread = ISC_LIST_NEXT(evthread, link); } /* * We need to create a new thread as other threads are full. * If we succeed in creating the thread, alert it to * process the event change list since it will have space. * If we are unable to create one, the event will stay on the * list and the next event_wait thread will try again to add * the event. It will call here again if it has no space. */ if (event_thread_create(&evthread, manager) == ISC_R_SUCCESS) { WSASetEvent(evthread->sockev_list.aEventList[0]); }}isc_boolean_tsocket_eventlist_add(event_change_t *evchange, sock_event_list *evlist, isc_socketmgr_t *manager) { int max_event; isc_socket_t *sock; REQUIRE(evchange != NULL); sock = evchange->sock; REQUIRE(sock != NULL); REQUIRE(sock->hEvent != NULL); REQUIRE(evlist != NULL); max_event = evlist->max_event; if(max_event >= MAX_EVENTS) { locate_available_thread(manager); return (ISC_FALSE); } evlist->aSockList[max_event] = sock; evlist->aEventList[max_event] = sock->hEvent; evlist->max_event++; evlist->total_events++; sock->hAlert = evlist->aEventList[0]; sock->evthread_id = GetCurrentThreadId(); return (ISC_TRUE);}/* * Note that the eventLock is locked before calling this function. * All Events and associated sockets are closed here. */isc_boolean_tsocket_eventlist_delete(event_change_t *evchange, sock_event_list *evlist) { int i; WSAEVENT hEvent; int iEvent = -1; REQUIRE(evchange != NULL); /* Make sure this is the right thread from which to delete the event */ if (evchange->evthread_id != GetCurrentThreadId()) return (ISC_FALSE); REQUIRE(evlist != NULL); REQUIRE(evchange->hEvent != NULL); hEvent = evchange->hEvent; /* Find the Event */ for (i = 1; i < evlist->max_event; i++) { if (evlist->aEventList[i] == hEvent) { iEvent = i; break; } } /* Actual event start at 1 */ if (iEvent < 1) return (ISC_FALSE); for(i = iEvent; i < (evlist->max_event - 1); i++) { evlist->aEventList[i] = evlist->aEventList[i + 1]; evlist->aSockList[i] = evlist->aSockList[i + 1]; } evlist->aEventList[evlist->max_event - 1] = 0; evlist->aSockList[evlist->max_event - 1] = NULL; /* Cleanup */ WSACloseEvent(hEvent); if (evchange->fd >= 0) closesocket(evchange->fd); evlist->max_event--; evlist->total_events--; return (ISC_TRUE);}/* * Get the event changes off of the list and apply the * requested changes. The manager lock is taken out at * the start of this function to prevent other event_wait * threads processing the same information at the same * time. The queue may not be empty on exit since other * threads may be involved in processing the queue. * * The deletes are done first in order that there be space * available for the events being added in the same thread * in case the event list is almost full. This reduces the * probability of having to create another thread which would * increase overhead costs. */isc_result_tprocess_eventlist(sock_event_list *evlist, isc_socketmgr_t *manager) { event_change_t *evchange; event_change_t *next; isc_boolean_t del; REQUIRE(evlist != NULL); LOCK(&manager->lock); /* * First the deletes. */ evchange = ISC_LIST_HEAD(manager->event_updates); while (evchange != NULL) { next = ISC_LIST_NEXT(evchange, link); del = ISC_FALSE; if (evchange->action == EVENT_DELETE) { del = socket_eventlist_delete(evchange, evlist); /* * Delete only if this thread's socket list was * updated. */ if (del) { ISC_LIST_DEQUEUE(manager->event_updates, evchange, link); HeapFree(hHeapHandle, 0, evchange); manager->event_written--; } } evchange = next; } /* * Now the adds. */ evchange = ISC_LIST_HEAD(manager->event_updates); while (evchange != NULL) { next = ISC_LIST_NEXT(evchange, link); del = ISC_FALSE; if (evchange->action == EVENT_ADD) { del = socket_eventlist_add(evchange, evlist, manager); /* * Delete only if this thread's socket list was * updated. */ if (del) { ISC_LIST_DEQUEUE(manager->event_updates, evchange, link); HeapFree(hHeapHandle, 0, evchange); manager->event_written--; } } evchange = next; } UNLOCK(&manager->lock); return (ISC_R_SUCCESS);}/* * Add the event list changes to the queue and notify the * event loop */static voidnotify_eventlist(isc_socket_t *sock, isc_socketmgr_t *manager, unsigned int action){ event_change_t *evchange; REQUIRE(VALID_MANAGER(manager)); REQUIRE(sock != NULL); evchange = HeapAlloc(hHeapHandle, HEAP_ZERO_MEMORY, sizeof(event_change_t)); evchange->sock = sock; evchange->action = action; evchange->hEvent = sock->hEvent; evchange->fd = sock->fd; evchange->evthread_id = sock->evthread_id; LOCK(&manager->lock); ISC_LIST_APPEND(manager->event_updates, evchange, link); sock->manager->event_written++; UNLOCK(&manager->lock); /* Alert the Wait List */ if (sock->hAlert != NULL) WSASetEvent(sock->hAlert); else WSASetEvent(manager->prime_alert);}/* * Note that the socket is already locked before calling this function */isc_result_tsocket_event_add(isc_socket_t *sock, long type) { int stat; WSAEVENT hEvent; char strbuf[ISC_STRERRORSIZE]; const char *msg; REQUIRE(sock != NULL); hEvent = WSACreateEvent(); if (hEvent == WSA_INVALID_EVENT) { stat = WSAGetLastError(); isc__strerror(stat, strbuf, sizeof(strbuf)); msg = isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed"), UNEXPECTED_ERROR(__FILE__, __LINE__, "WSACreateEvent: %s: %s", msg, strbuf); return (ISC_R_UNEXPECTED); } if (WSAEventSelect(sock->fd, hEvent, type) != 0) { stat = WSAGetLastError(); isc__strerror(stat, strbuf, sizeof(strbuf)); msg = isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed"); UNEXPECTED_ERROR(__FILE__, __LINE__, "WSAEventSelect: %s: %s", msg, strbuf); return (ISC_R_UNEXPECTED); } sock->hEvent = hEvent; sock->wait_type = type; notify_eventlist(sock, sock->manager, EVENT_ADD); return (ISC_R_SUCCESS);}/* * Note that the socket is not locked before calling this function */voidsocket_event_delete(isc_socket_t *sock) { REQUIRE(sock != NULL); REQUIRE(sock->hEvent != NULL); if (sock->hEvent != NULL) { sock->wait_type = 0; sock->pending_close = 1; notify_eventlist(sock, sock->manager, EVENT_DELETE); sock->hEvent = NULL; sock->hAlert = NULL; sock->evthread_id = 0; }}/* * Routine to cleanup and then close the socket. * Only close the socket here if it is NOT associated * with an event, otherwise the WSAWaitForMultipleEvents * may fail due to the fact that the the Wait should not * be running while closing an event or a socket. */voidsocket_close(isc_socket_t *sock) { REQUIRE(sock != NULL); sock->pending_close = 1; if (sock->hEvent != NULL) socket_event_delete(sock); else { closesocket(sock->fd); } if (sock->iocp) { sock->iocp = 0; InterlockedDecrement(&iocp_total); }}/* * Initialize socket services */BOOL InitSockets() { WORD wVersionRequested; WSADATA wsaData; int err; /* Need Winsock 2.0 or better */ wVersionRequested = MAKEWORD(2, 0); err = WSAStartup(wVersionRequested, &wsaData); if ( err != 0 ) { /* Tell the user that we could not find a usable Winsock DLL */ return(FALSE); } return(TRUE);}intinternal_sendmsg(isc_socket_t *sock, IoCompletionInfo *lpo, struct msghdr *messagehdr, int flags, int *Error){ int Result; DWORD BytesSent; DWORD Flags = flags; int total_sent; *Error = 0; Result = WSASendTo((SOCKET) sock->fd, messagehdr->msg_iov, messagehdr->msg_iovlen, &BytesSent, Flags, messagehdr->msg_name, messagehdr->msg_namelen, (LPOVERLAPPED) lpo, NULL); total_sent = (int) BytesSent; /* Check for errors.*/ if (Result == SOCKET_ERROR) { *Error = WSAGetLastError(); switch (*Error) { case WSA_IO_INCOMPLETE : case WSA_WAIT_IO_COMPLETION : case WSA_IO_PENDING : sock->pending_send++; case NO_ERROR : break; default : return (-1); break; } } else sock->pending_send++; if (lpo != NULL) return (0); else return (total_sent);}intinternal_recvmsg(isc_socket_t *sock, IoCompletionInfo *lpo, struct msghdr *messagehdr, int flags, int *Error){ DWORD Flags = 0; DWORD NumBytes = 0; int total_bytes = 0; int Result; *Error = 0; Result = WSARecvFrom((SOCKET) sock->fd, messagehdr->msg_iov, messagehdr->msg_iovlen, &NumBytes, &Flags, messagehdr->msg_name, (int *)&(messagehdr->msg_namelen), (LPOVERLAPPED) lpo, NULL); total_bytes = (int) NumBytes; /* Check for errors. */ if (Result == SOCKET_ERROR) { *Error = WSAGetLastError(); switch (*Error) { case WSA_IO_INCOMPLETE: case WSA_WAIT_IO_COMPLETION: case WSA_IO_PENDING: sock->pending_recv++; case NO_ERROR: break; default : return (-1); break; } } else sock->pending_recv++; /* Return the flags received in header */ messagehdr->msg_flags = Flags; if (lpo != NULL)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -