📄 iocpserver.cpp
字号:
WaitEvents[MAX_COMPLETION_THREAD_COUNT],
hrc;
int endpointcount=0,
waitcount=0,
interval,
rc,
i;
struct addrinfo *res=NULL,
*ptr=NULL;
// Validate the command line
ValidateArgs(argc, argv);
// Load Winsock
if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
{
fprintf(stderr, "unable to load Winsock!\n");
return -1;
}
InitializeCriticalSection(&gSocketListCs);
InitializeCriticalSection(&gBufferListCs);
InitializeCriticalSection(&gPendingCritSec);
// Create the completion port used by this server
CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
if (CompletionPort == NULL)
{
fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
return -1;
}
// Find out how many processors are on this system
GetSystemInfo(&sysinfo);
if (sysinfo.dwNumberOfProcessors > MAX_COMPLETION_THREAD_COUNT)
{
sysinfo.dwNumberOfProcessors = MAX_COMPLETION_THREAD_COUNT;
}
// Round the buffer size to the next increment of the page size
if ((gBufferSize % sysinfo.dwPageSize) != 0)
{
gBufferSize = ((gBufferSize / sysinfo.dwPageSize) + 1) * sysinfo.dwPageSize;
}
printf("Buffer size = %lu (page size = %lu)\n",
gBufferSize, sysinfo.dwPageSize);
// Create the worker threads to service the completion notifications
for(waitcount=0; waitcount < (int)sysinfo.dwNumberOfProcessors ;waitcount++)
{
WaitEvents[waitcount] = CreateThread(NULL, 0, CompletionThread, (LPVOID)CompletionPort, 0, NULL);
if (WaitEvents[waitcount] == NULL)
{
fprintf(stderr, "CreatThread failed: %d\n", GetLastError());
return -1;
}
}
printf("Local address: %s; Port: %s; Family: %d\n",
gBindAddr, gBindPort, gAddressFamily);
// Obtain the "wildcard" addresses for all the available address families
res = ResolveAddress(gBindAddr, gBindPort, gAddressFamily, gSocketType, gProtocol);
if (res == NULL)
{
fprintf(stderr, "ResolveAddress failed to return any addresses!\n");
return -1;
}
// For each local address returned, create a listening/receiving socket
ptr = res;
while (ptr)
{
printf("Listening address: ");
PrintAddress(ptr->ai_addr, ptr->ai_addrlen);
printf("\n");
listenobj = (LISTEN_OBJ *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(LISTEN_OBJ));
if (listenobj == NULL)
{
fprintf(stderr, "Out of memory!\n");
return -1;
}
listenobj->LoWaterMark = gInitialAccepts;
InitializeCriticalSection(&listenobj->ListenCritSec);
// Save off the address family of this socket
listenobj->AddressFamily = ptr->ai_family;
// create the socket
listenobj->s = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
if (listenobj->s == INVALID_SOCKET)
{
fprintf(stderr, "socket failed: %d\n", WSAGetLastError());
return -1;
}
// Create an event to register for FD_ACCEPT events on
listenobj->AcceptEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (listenobj->AcceptEvent == NULL)
{
fprintf(stderr, "CreateEvent failed: %d\n", GetLastError());
return -1;
}
listenobj->RepostAccept = CreateEvent(NULL, TRUE, FALSE, NULL);
if (listenobj->RepostAccept == NULL)
{
fprintf(stderr, "CreateSemaphore failed: %d\n", GetLastError());
return -1;
}
// Add the event to the liste of waiting events
WaitEvents[waitcount++] = listenobj->AcceptEvent;
WaitEvents[waitcount++] = listenobj->RepostAccept;
// Associate the socket and its SOCKET_OBJ to the completion port
hrc = CreateIoCompletionPort((HANDLE)listenobj->s, CompletionPort, (ULONG_PTR)listenobj, 0);
if (hrc == NULL)
{
fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
return -1;
}
// bind the socket to a local address and port
rc = bind(listenobj->s, ptr->ai_addr, ptr->ai_addrlen);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "bind failed: %d\n", WSAGetLastError());
return -1;
}
// Need to load the Winsock extension functions from each provider
// -- e.g. AF_INET and AF_INET6.
rc = WSAIoctl(
listenobj->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx,
sizeof(guidAcceptEx),
&listenobj->lpfnAcceptEx,
sizeof(listenobj->lpfnAcceptEx),
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER failed: %d\n",
WSAGetLastError());
return -1;
}
// Load the Winsock extensions from each provider
rc = WSAIoctl(
listenobj->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidGetAcceptExSockaddrs,
sizeof(guidGetAcceptExSockaddrs),
&listenobj->lpfnGetAcceptExSockaddrs,
sizeof(listenobj->lpfnGetAcceptExSockaddrs),
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER faled: %d\n",
WSAGetLastError());
return -1;
}
// Put the socket into listening mode
rc = listen(listenobj->s, 200);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "listen failed: %d\n", WSAGetLastError());
return -1;
}
// Register for FD_ACCEPT notification on listening socket
rc = WSAEventSelect(listenobj->s, listenobj->AcceptEvent, FD_ACCEPT);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAEventSelect failed: %d\n", WSAGetLastError());
return -1;
}
// Initiate the initial accepts for each listen socket
for(i=0; i < gInitialAccepts ;i++)
{
acceptobj = GetBufferObj(gBufferSize);
if (acceptobj == NULL)
{
fprintf(stderr, "Out of memory!\n");
return -1;
}
acceptobj->PostAccept = listenobj->AcceptEvent;
InsertPendingAccept(listenobj, acceptobj);
PostAccept(listenobj, acceptobj);
}
//
// Maintain a list of the listening socket structures
//
if (ListenSockets == NULL)
{
ListenSockets = listenobj;
}
else
{
listenobj->next = ListenSockets;
ListenSockets = listenobj;
}
endpointcount++;
ptr = ptr->ai_next;
}
// free the addrinfo structure for the 'bind' address
freeaddrinfo(res);
gStartTime = gStartTimeLast = GetTickCount();
interval = 0;
while (1)
{
rc = WSAWaitForMultipleEvents(
waitcount,
WaitEvents,
FALSE,
5000,
FALSE
);
if (rc == WAIT_FAILED)
{
fprintf(stderr, "WSAWaitForMultipleEvents failed: %d\n", WSAGetLastError());
break;
}
else if (rc == WAIT_TIMEOUT)
{
interval++;
PrintStatistics();
if (interval == 36)
{
int optval,
optlen;
// For TCP, cycle through all the outstanding AcceptEx operations
// to see if any of the client sockets have been connected but
// haven't received any data. If so, close them as they could be
// a denial of service attack.
listenobj = ListenSockets;
while (listenobj)
{
EnterCriticalSection(&listenobj->ListenCritSec);
acceptobj = listenobj->PendingAccepts;
while (acceptobj)
{
optlen = sizeof(optval);
rc = getsockopt(
acceptobj->sclient,
SOL_SOCKET,
SO_CONNECT_TIME,
(char *)&optval,
&optlen
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "getsockopt: SO_CONNECT_TIME failed: %d\n",
WSAGetLastError());
}
else
{
// If the socket has been connected for more than 5 minutes,
// close it. If closed, the AcceptEx call will fail in the
// completion thread.
if ((optval != 0xFFFFFFFF) && (optval > 300))
{
printf("closing stale handle\n");
closesocket(acceptobj->sclient);
acceptobj->sclient = INVALID_SOCKET;
}
}
acceptobj = acceptobj->next;
}
LeaveCriticalSection(&listenobj->ListenCritSec);
listenobj = listenobj->next;
}
interval = 0;
}
}
else
{
int index;
index = rc - WAIT_OBJECT_0;
for( ; index < waitcount ; index++)
{
rc = WaitForSingleObject(WaitEvents[index], 0);
if (rc == WAIT_FAILED || rc == WAIT_TIMEOUT)
{
continue;
}
if (index < (int)sysinfo.dwNumberOfProcessors)
{
// One of the completion threads exited
// This is bad so just bail - a real server would want
// to gracefully exit but this is just a sample ...
ExitProcess(-1);
}
else
{
// An FD_ACCEPT event occured
listenobj = ListenSockets;
while (listenobj)
{
if ((listenobj->AcceptEvent == WaitEvents[index]) ||
(listenobj->RepostAccept == WaitEvents[index]))
break;
listenobj = listenobj->next;
}
if (listenobj)
{
WSANETWORKEVENTS ne;
int limit=0;
if (listenobj->AcceptEvent == WaitEvents[index])
{
// EnumNetworkEvents to see if FD_ACCEPT was set
rc = WSAEnumNetworkEvents(
listenobj->s,
listenobj->AcceptEvent,
&ne
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAEnumNetworkEvents failed: %d\n",
WSAGetLastError());
}
if ((ne.lNetworkEvents & FD_ACCEPT) == FD_ACCEPT)
{
// We got an FD_ACCEPT so post multiple accepts to
// cover the burst
limit = BURST_ACCEPT_COUNT;
}
}
else if (listenobj->RepostAccept == WaitEvents[index])
{
// Semaphore is signaled
limit = InterlockedExchange(&listenobj->RepostCount, 0);
ResetEvent(listenobj->RepostAccept);
}
i = 0;
while ( (i++ < limit) &&
(listenobj->PendingAcceptCount < gMaxAccepts) )
{
acceptobj = GetBufferObj(gBufferSize);
if (acceptobj)
{
acceptobj->PostAccept = listenobj->AcceptEvent;
InsertPendingAccept(listenobj, acceptobj);
PostAccept(listenobj, acceptobj);
}
}
}
}
}
}
}
WSACleanup();
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -